Class: ActionEvent::Commands::Poller

Inherits:
Object
  • Object
show all
Defined in:
lib/action_event/commands/poller.rb

Instance Method Summary collapse

Constructor Details

#initializePoller

Returns a new instance of Poller.



7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/action_event/commands/poller.rb', line 7

def initialize
  @options = {
    :id => 1,
    :queues => %W(high medium low),
    :command => 'start',
    :environment => RAILS_ENV,
    :daemon => false,
    :max_load_average => 8,
    :min_instances => 5,
    :max_instances => 200,
    :max_adjustment => 5,
    :min_queue_size => 1000
  }
  
  OptionParser.new do |opts|
    opts.banner = "Usage: #{$0} [options] <command>"
    opts.on("-d", "--daemon", "Run as a daemon") { |v| @options[:daemon] = v }
    opts.on("-i", "--id=N", Integer, "Specify ID used in PID file when running as daemon") { |v| @options[:id] = v }
    opts.on("-q", "--queues='high medium low'", "Specify queue names in order") { |v| @options[:queues] = v.split(' ') }
    opts.on("-e", "--environment=development", "Specify which rails environment to run in") { |v| @options[:environment] = v }
    opts.separator ""
    opts.separator "Cluster options:"
    opts.on("-l", "--load-average=8", "Specify what load average to optimize to") { |v| @options[:max_load_average] = v }
    opts.on("-m", "--min-instances=5", "Specify mimimum number of instances") { |v| @options[:min_instances] = v }
    opts.on("-x", "--max-instances=200", "Specify maximum number of instances") { |v| @options[:max_instances] = v }
    opts.on("-a", "--max-adjustment=5", "Specify how many the maximum amount of instances that will be adjusted") { |v| @options[:max_adjustment] = v }
    opts.on("-s", "--min-queue-size=1000", "Specify how many must be in the queue to adjust instances") { |v| @options[:min_queue_size] = v }
    opts.separator ""
    opts.separator "Commands:"
    opts.separator "    start - starts up the poller"
    opts.separator "    stop - stops a poller currently running as a daemon"
    opts.separator "    status - prints the status of the queues"
    opts.separator ""
    opts.separator "Examples:"
    opts.separator "    #{$0} start                       (starts a poller running in the console)"
    opts.separator "    #{$0} -d -e production start      (starts a poller running as a daemon with ID #1)"
    opts.separator "    #{$0} --daemon --id=5 start       (starts poller with ID #5)"
    opts.separator "    #{$0} --daemon --id=5 stop        (stops poller with ID #5)"
  end.parse!

  @options[:command] = ARGV.pop unless ARGV.empty?

  case
    when @options[:command] == 'start' && !@options[:daemon] then trap_ctrl_c and load_rails_environment and start_processing_loop
    when @options[:command] == 'start' && @options[:daemon] then trap_term and start_daemon and load_rails_environment and start_processing_loop and remove_pid
    when @options[:command] == 'stop' then stop_daemon
    when @options[:command] == 'cluster' then load_rails_environment and refresh_cluster
    when @options[:command] == 'status' then load_rails_environment and print_status
  end
end

Instance Method Details

#load_rails_environmentObject



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/action_event/commands/poller.rb', line 64

def load_rails_environment
  ENV['ACTION_EVENT_USE_POLLER_DB'] = 'true'
  ENV["RAILS_ENV"] = @options[:environment]
  RAILS_ENV.replace(@options[:environment])
  log "Loading #{RAILS_ENV} environment..."
  require "#{RAILS_ROOT}/config/environment"
  
  if defined?(NewRelic)
    NewRelic::Control.instance.instance_eval do
      @settings['app_name'] = @settings['app_name'] + ' (Poller)'
    end
  end
  
  true
end

#log(message) ⇒ Object



229
230
231
232
# File 'lib/action_event/commands/poller.rb', line 229

def log(message)
  $stdout.puts "[#{"#{@options[:id]}:#{Process.pid} " if @options[:daemon]}#{Time.now}] #{message}"
  $stdout.flush
end

#next_iterationObject

if we can get a message, process it



197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
# File 'lib/action_event/commands/poller.rb', line 197

def next_iteration
  reload_application if RAILS_ENV == 'development'
  if message = ActionEvent::Message.try_to_get_next_message(@options[:queues])
    begin
      log_text = "#{message[:queue_name]}:#{message[:event]} (#{message[:params].inspect})"
      log "Processing #{log_text}"
      "#{message[:event]}_event".camelize.constantize.process(message[:params])
      log "Finished processing #{log_text}"
    rescue Exception => e
      log "Error processing #{log_text}: #{e} #{e.backtrace.join("\n")}"
    end
    return true
  else
    # return false if we didn't get a message... makes start_processing_loop sleep(1)
    return false
  end
rescue Exception => e
  log "Error getting next message (#{e})"
  ActionEvent::Message.connection.verify! rescue log("Error verifying DB connection... sleeping 5 seconds. (#{$!})") and sleep(5)
  return true
end

#pid_filenameObject

returns the name of the PID file to use for daemons



81
82
83
# File 'lib/action_event/commands/poller.rb', line 81

def pid_filename
  @pid_filename ||= File.join(RAILS_ROOT, "/log/poller.#{@options[:id]}.pid")
end


58
59
60
61
62
# File 'lib/action_event/commands/poller.rb', line 58

def print_status
  ActionEvent::Message.queue_status(@options[:queues]).to_a.sort { |a,b| a.first <=> b.first }.each do |table,messages_left|
    log "#{table}:\t\t#{messages_left}"
  end        
end

#refresh_clusterObject



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/action_event/commands/poller.rb', line 127

def refresh_cluster
  # gather some current stats
  current_load = `uptime`.split(' ')[-3..-3][0].to_f
  current_queue = ActionEvent::Message.queue_status(*@options[:queues]).to_a.map(&:last).sum

  # remove stale pid files
  current_pids = Dir[File.join(RAILS_ROOT, "log/poller.*.pid")]
  active_pids, stale_pids = current_pids.partition { |f| (File.read("/proc/#{File.read(f).to_i}/cmdline").include?('poller') rescue false) }
  stale_pids.each { |f| File.delete(f) }

  # compute adjustment based on current load average and queue size
  if active_pids.length > 0
    current_instances = active_pids.length
    needed_instances = ((current_instances*@options[:max_load_average])/current_load).floor

    if needed_instances > current_instances
      needed_instances = [needed_instances, current_instances + @options[:max_adjustment]].min
    elsif needed_instances < current_instances && current_queue > @options[:min_queue_size]
      needed_instances = [needed_instances, current_instances - @options[:max_adjustment]].max
    end
  else
    current_instances = 0
    needed_instances = @options[:min_instances]
  end

  needed_instances = @options[:max_instances] if needed_instances > @options[:max_instances]
  needed_instances = @options[:min_instances] if needed_instances < @options[:min_instances]


  # remove pids if there's too many or spawn new ones if there's not enough
  if needed_instances < current_instances
    active_pids.last(current_instances - needed_instances).each { |pid_file| puts "delete #{pid_file}" } #File.delete(pid_file) }
  elsif needed_instances > current_instances
    (needed_instances - current_instances).times do
      next_id = (1..needed_instances).to_a.find { |i| !File.exists?(File.join(RAILS_ROOT, "log/poller.#{i}.pid")) }
      puts "start at id #{next_id}"
      # if fork
      # 
      # end
    end
  end
end

#reload_applicationObject



219
220
221
222
223
224
225
226
227
# File 'lib/action_event/commands/poller.rb', line 219

def reload_application
  ActionController::Routing::Routes.reload
  ActionController::Base.view_paths.reload! rescue nil
  ActionView::Helpers::AssetTagHelper::AssetTag::Cache.clear rescue nil

  ActiveRecord::Base.reset_subclasses
  ActiveSupport::Dependencies.clear
  ActiveRecord::Base.clear_reloadable_connections!
end

#remove_pidObject



182
183
184
185
186
187
# File 'lib/action_event/commands/poller.rb', line 182

def remove_pid
  if Process.pid == (File.read(pid_filename).to_i rescue nil)
    log "Cleaning up PID file..."
    FileUtils.rm(pid_filename) 
  end
end

#should_stop_processing?Boolean

Returns:

  • (Boolean)


170
171
172
# File 'lib/action_event/commands/poller.rb', line 170

def should_stop_processing?
  @stop_processing || (@options[:daemon] && (File.read(pid_filename).to_i rescue 0) != Process.pid)
end

#start_daemonObject

forks from the current process and closes out everything



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/action_event/commands/poller.rb', line 86

def start_daemon
  log "Starting daemon ##{@options[:id]}..."

  # some process magic
  exit if fork                     # Parent exits, child continues.
  Process.setsid                   # Become session leader.
  exit if fork                     # Zap session leader. 
  Dir.chdir "/"                    # Release old working directory.
  File.umask 0000                  # Ensure sensible umask. Adjust as needed.

  # Free file descriptors and point them somewhere sensible.
  STDIN.reopen "/dev/null"
  STDOUT.reopen File.join(RAILS_ROOT, "log/poller.log"), "a"
  STDERR.reopen STDOUT

  # don't start up until the previous poller is dead
  while (previous_pid = File.read(pid_filename).to_i rescue nil) do
    break unless File.exists?("/proc/#{previous_pid}")
    log "Waiting for previous poller to finish..."
    Process.kill('TERM', previous_pid) 
    sleep(5)
  end
  
  # record pid
  File.open(pid_filename, 'w') { |f| f << Process.pid }
end

#start_processing_loopObject

loops until should_stop_processing? set to true… in local mode, this is never set so it will loop forever



190
191
192
193
194
# File 'lib/action_event/commands/poller.rb', line 190

def start_processing_loop
  log "Processing queues: #{@options[:queues].join(',')}"
  next_iteration or sleep(0.5) until should_stop_processing?
  log "Got signal to stop... exiting."
end

#stop_daemonObject

finds the already running daemon and stops it…



175
176
177
178
179
180
# File 'lib/action_event/commands/poller.rb', line 175

def stop_daemon
  if previous_pid = File.read(pid_filename).to_i rescue nil
    log "Sending stop signal to daemon ##{@options[:id]}..."
    Process.kill('TERM', previous_pid) 
  end
end

#trap_ctrl_cObject



113
114
115
116
117
118
# File 'lib/action_event/commands/poller.rb', line 113

def trap_ctrl_c
  trap("SIGINT") do
    @stop_processing = true
    log "Sending stop signal..."
  end
end

#trap_termObject



120
121
122
123
124
125
# File 'lib/action_event/commands/poller.rb', line 120

def trap_term
  trap("SIGTERM") do
    @stop_processing = true
    log "Received stop signal..."
  end
end