Class: Bluepill::Process

Inherits:
Object
  • Object
show all
Defined in:
lib/bluepill/process.rb

Constant Summary collapse

CONFIGURABLE_ATTRIBUTES =
[
  :pre_start_command,
  :start_command,
  :stop_command,
  :restart_command,

  :stdout,
  :stderr,
  :stdin,

  :daemonize,
  :pid_file,
  :working_dir,
  :environment,

  :start_grace_time,
  :stop_grace_time,
  :restart_grace_time,

  :uid,
  :gid,

  :cache_actual_pid,

  :monitor_children,
  :child_process_factory,

  :pid_command,
  :auto_start,

  :supplementary_groups,

  :stop_signals,

  :on_start_timeout,

  :group_start_noblock,
  :group_restart_noblock,
  :group_stop_noblock,
  :group_unmonitor_noblock

]

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(process_name, checks, options = {}) ⇒ Process

Returns a new instance of Process.



114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/bluepill/process.rb', line 114

def initialize(process_name, checks, options = {})
  @name = process_name
  @event_mutex = Monitor.new
  @watches = []
  @triggers = []
  @children = []
  @threads = []
  @statistics = ProcessStatistics.new
  @actual_pid = options[:actual_pid]
  self.logger = options[:logger]

  checks.each do |name, opts|
    if Trigger[name]
      self.add_trigger(name, opts)
    else
      self.add_watch(name, opts)
    end
  end

  # These defaults are overriden below if it's configured to be something else.
  @monitor_children =  false
  @cache_actual_pid = true
  @start_grace_time = @stop_grace_time = @restart_grace_time = 3
  @environment = {}
  @on_start_timeout = "start"
  @group_start_noblock = @group_stop_noblock = @group_restart_noblock = @group_unmonitor_noblock = true

  CONFIGURABLE_ATTRIBUTES.each do |attribute_name|
    self.send("#{attribute_name}=", options[attribute_name]) if options.has_key?(attribute_name)
  end

  # Let state_machine do its initialization stuff
  super() # no arguments intentional
end

Instance Attribute Details

#childrenObject (readonly)

Returns the value of attribute children.



58
59
60
# File 'lib/bluepill/process.rb', line 58

def children
  @children
end

#loggerObject

Returns the value of attribute logger.



56
57
58
# File 'lib/bluepill/process.rb', line 56

def logger
  @logger
end

#nameObject

Returns the value of attribute name.



56
57
58
# File 'lib/bluepill/process.rb', line 56

def name
  @name
end

#process_runningObject

Returns the value of attribute process_running.



56
57
58
# File 'lib/bluepill/process.rb', line 56

def process_running
  @process_running
end

#skip_ticks_untilObject

Returns the value of attribute skip_ticks_until.



56
57
58
# File 'lib/bluepill/process.rb', line 56

def skip_ticks_until
  @skip_ticks_until
end

#statisticsObject (readonly)

Returns the value of attribute statistics.



58
59
60
# File 'lib/bluepill/process.rb', line 58

def statistics
  @statistics
end

#triggersObject

Returns the value of attribute triggers.



56
57
58
# File 'lib/bluepill/process.rb', line 56

def triggers
  @triggers
end

#watchesObject

Returns the value of attribute watches.



56
57
58
# File 'lib/bluepill/process.rb', line 56

def watches
  @watches
end

Instance Method Details

#actual_pidObject



419
420
421
# File 'lib/bluepill/process.rb', line 419

def actual_pid
  pid_command ? pid_from_command : pid_from_file
end

#actual_pid=(pid) ⇒ Object



443
444
445
446
# File 'lib/bluepill/process.rb', line 443

def actual_pid=(pid)
  ProcessJournal.append_pid_to_journal(name, pid) # be sure to always log the pid
  @actual_pid = pid
end

#add_trigger(name, options = {}) ⇒ Object



211
212
213
# File 'lib/bluepill/process.rb', line 211

def add_trigger(name, options = {})
  self.triggers << Trigger[name].new(self, options.merge(:logger => self.logger))
end

#add_watch(name, options = {}) ⇒ Object

Watch related methods



207
208
209
# File 'lib/bluepill/process.rb', line 207

def add_watch(name, options = {})
  self.watches << ConditionWatch.new(name, options.merge(:logger => self.logger))
end

#cache_actual_pid?Boolean

Returns:

  • (Boolean)


415
416
417
# File 'lib/bluepill/process.rb', line 415

def cache_actual_pid?
  !!@cache_actual_pid
end

#clean_threadsObject



393
394
395
396
# File 'lib/bluepill/process.rb', line 393

def clean_threads
  @threads.each { |t| t.kill }
  @threads.clear
end

#clear_pidObject



448
449
450
# File 'lib/bluepill/process.rb', line 448

def clear_pid
  @actual_pid = nil
end

#daemonize?Boolean

Returns:

  • (Boolean)


398
399
400
# File 'lib/bluepill/process.rb', line 398

def daemonize?
  !!self.daemonize
end

#determine_initial_stateObject



239
240
241
242
243
244
245
# File 'lib/bluepill/process.rb', line 239

def determine_initial_state
  if self.process_running?(true)
    self.state = 'up'
  else
    self.state = (auto_start == false) ? 'unmonitored' : 'down' # we need to check for false value
  end
end

#dispatch!(event, reason = nil) ⇒ Object

State machine methods



179
180
181
182
183
184
# File 'lib/bluepill/process.rb', line 179

def dispatch!(event, reason = nil)
  @event_mutex.synchronize do
    @statistics.record_event(event, reason)
    self.send("#{event}")
  end
end

#handle_user_command(cmd) ⇒ Object



247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
# File 'lib/bluepill/process.rb', line 247

def handle_user_command(cmd)
  case cmd
  when "start"
    if self.process_running?(true)
      logger.warning("Refusing to re-run start command on an already running process.")
    else
      dispatch!(:start, "user initiated")
    end
  when "stop"
    stop_process
    dispatch!(:unmonitor, "user initiated")
  when "restart"
    restart_process
  when "unmonitor"
    # When the user issues an unmonitor cmd, reset any triggers so that
    # scheduled events gets cleared
    triggers.each {|t| t.reset! }
    dispatch!(:unmonitor, "user initiated")
  end
end

#monitor_children?Boolean

Returns:

  • (Boolean)


402
403
404
# File 'lib/bluepill/process.rb', line 402

def monitor_children?
  !!self.monitor_children
end

#notify_triggers(transition) ⇒ Object



202
203
204
# File 'lib/bluepill/process.rb', line 202

def notify_triggers(transition)
  self.triggers.each {|trigger| trigger.notify(transition)}
end

#pid_from_commandObject



438
439
440
441
# File 'lib/bluepill/process.rb', line 438

def pid_from_command
  pid = %x{#{pid_command}}.strip
  (pid =~ /\A\d+\z/) ? pid.to_i : nil
end

#pid_from_fileObject



423
424
425
426
427
428
429
430
431
432
433
434
435
436
# File 'lib/bluepill/process.rb', line 423

def pid_from_file
  return @actual_pid if cache_actual_pid? && @actual_pid
  @actual_pid = begin
    if pid_file
      if File.exists?(pid_file)
        str = File.read(pid_file)
        str.to_i if str.size > 0
      else
        logger.warning("pid_file #{pid_file} does not exist or cannot be read")
        nil
      end
    end
  end
end

#pre_start_processObject



306
307
308
309
310
311
312
313
314
# File 'lib/bluepill/process.rb', line 306

def pre_start_process
  return unless pre_start_command
  logger.warning "Executing pre start command: #{pre_start_command}"
  result = System.execute_blocking(pre_start_command, self.system_command_options)
  unless result[:exit_code].zero?
    logger.warning "Pre start command execution returned non-zero exit code:"
    logger.warning result.inspect
  end
end

#prepare_command(command) ⇒ Object



489
490
491
# File 'lib/bluepill/process.rb', line 489

def prepare_command(command)
  command.to_s.gsub("{{PID}}", actual_pid.to_s)
end

#process_running?(force = false) ⇒ Boolean

System Process Methods

Returns:

  • (Boolean)


269
270
271
272
273
274
275
276
# File 'lib/bluepill/process.rb', line 269

def process_running?(force = false)
  @process_running = nil if force # clear existing state if forced

  @process_running ||= signal_process(0)
  # the process isn't running, so we should clear the PID
  self.clear_pid unless @process_running
  @process_running
end

#record_transition(transition) ⇒ Object



186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/bluepill/process.rb', line 186

def record_transition(transition)
  unless transition.loopback?
    @transitioned = true

    # When a process changes state, we should clear the memory of all the watches
    self.watches.each { |w| w.clear_history! }

    # Also, when a process changes state, we should re-populate its child list
    if self.monitor_children?
      self.logger.warning "Clearing child list"
      self.children.clear
    end
    logger.info "Going from #{transition.from_name} => #{transition.to_name}"
  end
end

#refresh_children!Object



467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
# File 'lib/bluepill/process.rb', line 467

def refresh_children!
  # First prune the list of dead children
  @children.delete_if {|child| !child.process_running?(true) }

  # Add new found children to the list
  new_children_pids = System.get_children(self.actual_pid) - @children.map {|child| child.actual_pid}

  unless new_children_pids.empty?
    logger.info "Existing children: #{@children.collect{|c| c.actual_pid}.join(",")}. Got new children: #{new_children_pids.inspect} for #{actual_pid}"
  end

  # Construct a new process wrapper for each new found children
  new_children_pids.each do |child_pid|
    ProcessJournal.append_pid_to_journal(name, child_pid)
    child_name = "<child(pid:#{child_pid})>"
    logger = self.logger.prefix_with(child_name)

    child = self.child_process_factory.create_child_process(child_name, child_pid, logger)
    @children << child
  end
end

#restart_processObject



370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
# File 'lib/bluepill/process.rb', line 370

def restart_process
  if restart_command
    cmd = self.prepare_command(restart_command)

    logger.warning "Executing restart command: #{cmd}"

    with_timeout(restart_grace_time, "restart") do
      result = System.execute_blocking(cmd, self.system_command_options)

      unless result[:exit_code].zero?
        logger.warning "Restart command execution returned non-zero exit code:"
        logger.warning result.inspect
      end
    end

    self.skip_ticks_for(restart_grace_time)
  else
    logger.warning "No restart_command specified. Must stop and start to restart"
    self.stop_process
    self.start_process
  end
end

#run_watchesObject



215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
# File 'lib/bluepill/process.rb', line 215

def run_watches
  now = Time.now.to_i

  threads = self.watches.collect do |watch|
    [watch, Thread.new { Thread.current[:events] = watch.run(self.actual_pid, now) }]
  end

  @transitioned = false

  threads.inject([]) do |events, (watch, thread)|
    thread.join
    if thread[:events].size > 0
      logger.info "#{watch.name} dispatched: #{thread[:events].join(',')}"
      thread[:events].each do |event|
        events << [event, watch.to_s]
      end
    end
    events
  end.each do |(event, reason)|
    break if @transitioned
    self.dispatch!(event, reason)
  end
end

#signal_process(code) ⇒ Object



406
407
408
409
410
411
412
413
# File 'lib/bluepill/process.rb', line 406

def signal_process(code)
  code = code.to_s.upcase if code.is_a?(String) || code.is_a?(Symbol)
  ::Process.kill(code, actual_pid)
  true
rescue Exception => e
  logger.err "Failed to signal process #{actual_pid} with code #{code}: #{e}"
  false
end

#skip_ticks_for(seconds) ⇒ Object

Internal State Methods



457
458
459
460
461
# File 'lib/bluepill/process.rb', line 457

def skip_ticks_for(seconds)
  # TODO: should this be addative or longest wins?
  #       i.e. if two calls for skip_ticks_for come in for 5 and 10, should it skip for 10 or 15?
  self.skip_ticks_until = (self.skip_ticks_until || Time.now.to_i) + seconds.to_i
end

#skipping_ticks?Boolean

Returns:

  • (Boolean)


463
464
465
# File 'lib/bluepill/process.rb', line 463

def skipping_ticks?
  self.skip_ticks_until && self.skip_ticks_until > Time.now.to_i
end

#start_processObject



278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
# File 'lib/bluepill/process.rb', line 278

def start_process
  ProcessJournal.kill_all_from_journal(name) # be sure nothing else is running from previous runs
  pre_start_process
  logger.warning "Executing start command: #{start_command}"
  if self.daemonize?
    daemon_id = System.daemonize(start_command, self.system_command_options)
    if daemon_id
      ProcessJournal.append_pid_to_journal(name, daemon_id)
      children.each {|child|
        ProcessJournal.append_pid_to_journal(name, child.actual_id)
      } if self.monitor_children?
    end
    daemon_id
  else
    # This is a self-daemonizing process
    with_timeout(start_grace_time, on_start_timeout) do
      result = System.execute_blocking(start_command, self.system_command_options)

      unless result[:exit_code].zero?
        logger.warning "Start command execution returned non-zero exit code:"
        logger.warning result.inspect
      end
    end
  end

  self.skip_ticks_for(start_grace_time)
end

#stop_processObject



316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
# File 'lib/bluepill/process.rb', line 316

def stop_process
  if monitor_children
    System.get_children(self.actual_pid).each do |child_pid|
      ProcessJournal.append_pid_to_journal(name, child_pid)
    end
  end

  if stop_command
    cmd = self.prepare_command(stop_command)
    logger.warning "Executing stop command: #{cmd}"

    with_timeout(stop_grace_time, "stop") do
      result = System.execute_blocking(cmd, self.system_command_options)

      unless result[:exit_code].zero?
        logger.warning "Stop command execution returned non-zero exit code:"
        logger.warning result.inspect
      end
    end

  elsif stop_signals
    # issue stop signals with configurable delay between each
    logger.warning "Sending stop signals to #{actual_pid}"
    @threads << Thread.new(self, stop_signals.clone) do |process, stop_signals|
      signal = stop_signals.shift
      logger.info "Sending signal #{signal} to #{process.actual_pid}"
      process.signal_process(signal) # send first signal

      until stop_signals.empty?
        # we already checked to make sure stop_signals had an odd number of items
        delay = stop_signals.shift
        signal = stop_signals.shift

        logger.debug "Sleeping for #{delay} seconds"
        sleep delay
        #break unless signal_process(0) #break unless the process can be reached
        unless process.signal_process(0)
          logger.debug "Process has terminated."
          break
        end
        logger.info "Sending signal #{signal} to #{process.actual_pid}"
        process.signal_process(signal)
      end
    end
  else
    logger.warning "Executing default stop command. Sending TERM signal to #{actual_pid}"
    signal_process("TERM")
  end
  ProcessJournal.kill_all_from_journal(name) # finish cleanup
  self.unlink_pid # TODO: we only write the pid file if we daemonize, should we only unlink it if we daemonize?

  self.skip_ticks_for(stop_grace_time)
end

#system_command_optionsObject



493
494
495
496
497
498
499
500
501
502
503
504
505
506
# File 'lib/bluepill/process.rb', line 493

def system_command_options
  {
    :uid         => self.uid,
    :gid         => self.gid,
    :working_dir => self.working_dir,
    :environment => self.environment,
    :pid_file    => self.pid_file,
    :logger      => self.logger,
    :stdin       => self.stdin,
    :stdout      => self.stdout,
    :stderr      => self.stderr,
    :supplementary_groups => self.supplementary_groups
  }
end

#tickObject



149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/bluepill/process.rb', line 149

def tick
  return if self.skipping_ticks?
  self.skip_ticks_until = nil

  # clear the memoization per tick
  @process_running = nil

  # Deal with thread cleanup here since the stopping state isn't used
  clean_threads if self.unmonitored?

  # run state machine transitions
  super

  if self.up?
    self.run_watches

    if self.monitor_children?
      refresh_children!
      children.each {|child| child.tick}
    end
  end
end


452
453
454
# File 'lib/bluepill/process.rb', line 452

def unlink_pid
  System.delete_if_exists(pid_file)
end

#with_timeout(secs, next_state = nil, &blk) ⇒ Object



508
509
510
511
512
513
514
515
516
517
518
# File 'lib/bluepill/process.rb', line 508

def with_timeout(secs, next_state = nil, &blk)
  # Attempt to execute the passed block. If the block takes
  # too long, transition to the indicated next state.
  begin
    Timeout.timeout(secs.to_f, &blk)
  rescue Timeout::Error
    logger.err "Execution is taking longer than expected."
    logger.err "Did you forget to tell bluepill to daemonize this process?"
    dispatch!(next_state)
  end
end