Class: Raemon::Master

Inherits:
Object
  • Object
show all
Defined in:
lib/raemon/master.rb

Constant Summary collapse

WORKERS =
{}
SELF_PIPE =
[]
SIG_QUEUE =
[]
CHUNK_SIZE =
(16*1024)

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Master

Returns a new instance of Master.



27
28
29
30
31
32
33
34
35
36
# File 'lib/raemon/master.rb', line 27

def initialize(options={})
  @detach         = options[:detach] || false
  @name           = options[:name] || 'Raemon'
  @pid_file       = options[:pid_file]
  @logger         = options[:logger] || Logger.new(STDOUT)
  @timeout        = options[:timeout] || 180 # 3 minutes
  @memory_limit   = options[:memory_limit] # in MB
  
  daemonize if @detach
end

Instance Attribute Details

#loggerObject

Returns the value of attribute logger.



11
12
13
# File 'lib/raemon/master.rb', line 11

def logger
  @logger
end

#master_pidObject

Returns the value of attribute master_pid.



11
12
13
# File 'lib/raemon/master.rb', line 11

def master_pid
  @master_pid
end

#memory_limitObject

Returns the value of attribute memory_limit.



11
12
13
# File 'lib/raemon/master.rb', line 11

def memory_limit
  @memory_limit
end

#nameObject

Returns the value of attribute name.



11
12
13
# File 'lib/raemon/master.rb', line 11

def name
  @name
end

#num_workersObject

Returns the value of attribute num_workers.



11
12
13
# File 'lib/raemon/master.rb', line 11

def num_workers
  @num_workers
end

#pid_fileObject

Returns the value of attribute pid_file.



11
12
13
# File 'lib/raemon/master.rb', line 11

def pid_file
  @pid_file
end

#timeoutObject

Returns the value of attribute timeout.



11
12
13
# File 'lib/raemon/master.rb', line 11

def timeout
  @timeout
end

#worker_klassObject

Returns the value of attribute worker_klass.



11
12
13
# File 'lib/raemon/master.rb', line 11

def worker_klass
  @worker_klass
end

Class Method Details

.start(num_workers, worker_klass, options = {}) ⇒ Object



15
16
17
18
# File 'lib/raemon/master.rb', line 15

def self.start(num_workers, worker_klass, options={})
  master = new(options)
  master.start(num_workers, worker_klass)
end

.stop(options = {}) ⇒ Object



20
21
22
23
24
25
# File 'lib/raemon/master.rb', line 20

def self.stop(options={})
  pid_file = options[:pid_file]
  pid = options[:pid] || (File.read(pid_file).to_i rescue 0)
  Process.kill('QUIT', pid) if pid > 0
rescue Errno::ESRCH
end

Instance Method Details

#start(num_workers, worker_klass) ⇒ Object



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/raemon/master.rb', line 38

def start(num_workers, worker_klass)
  logger.info "=> Starting #{name} with #{num_workers} worker(s)"

  @master_pid   = $$
  @num_workers  = num_workers
  @worker_klass = worker_klass
  
  # Check if the worker implements our interface
  if !worker_klass.include?(Raemon::Worker)
    logger.error "** Invalid Raemon worker"
    logger.close
    exit
  end
  
  # Start the master loop which spawns and monitors workers
  master_loop!
end

#stop(graceful = true) ⇒ Object

Terminates all workers, but does not exit master process



57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/raemon/master.rb', line 57

def stop(graceful=true)
  kill_each_worker(graceful ? :QUIT : :TERM)
  timeleft = timeout
  step = 0.2
  reap_all_workers
  until WORKERS.empty?
    sleep(step)
    reap_all_workers
    (timeleft -= step) > 0 and next
    kill_each_worker(:KILL)
  end
end

#worker_heartbeat!(worker) ⇒ Object



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/raemon/master.rb', line 70

def worker_heartbeat!(worker)
  return if timeout <= 0
  @last_pulse ||= 0
  
  begin
    if Time.now.to_i > @last_pulse + (timeout/2) 
      # Pulse (our lifeline to the master process)
      @last_pulse = Time.now.to_i
      @p = 0 == @p ? 1 : 0
      worker.pulse.chmod(@p)

      # Make sure master is still around otherwise exit
      master_pid == Process.ppid or return
    end
  rescue => ex
    if worker.pulse
      logger.error "Unhandled listen loop exception #{ex.inspect}."
      logger.error ex.backtrace.join("\n")
    end
  end
end