Class: ActionEvent::Commands::Poller
- Inherits:
-
Object
- Object
- ActionEvent::Commands::Poller
- Defined in:
- lib/action_event/commands/poller.rb
Instance Method Summary collapse
-
#initialize ⇒ Poller
constructor
A new instance of Poller.
- #load_rails_environment ⇒ Object
- #log(message) ⇒ Object
-
#next_iteration ⇒ Object
if we can get a message, process it.
-
#pid_filename ⇒ Object
returns the name of the PID file to use for daemons.
- #print_status ⇒ Object
- #refresh_cluster ⇒ Object
- #reload_application ⇒ Object
- #remove_pid ⇒ Object
- #should_stop_processing? ⇒ Boolean
-
#start_daemon ⇒ Object
forks from the current process and closes out everything.
-
#start_processing_loop ⇒ Object
loops until should_stop_processing? set to true…
-
#stop_daemon ⇒ Object
finds the already running daemon and stops it…
- #trap_ctrl_c ⇒ Object
- #trap_term ⇒ Object
Constructor Details
#initialize ⇒ Poller
Returns a new instance of Poller.
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/action_event/commands/poller.rb', line 7 def initialize @options = { :id => 1, :queues => %W(high medium low), :command => 'start', :environment => RAILS_ENV, :daemon => false, :max_load_average => 8, :min_instances => 5, :max_instances => 200, :max_adjustment => 5, :min_queue_size => 1000 } OptionParser.new do |opts| opts. = "Usage: #{$0} [options] <command>" opts.on("-d", "--daemon", "Run as a daemon") { |v| @options[:daemon] = v } opts.on("-i", "--id=N", Integer, "Specify ID used in PID file when running as daemon") { |v| @options[:id] = v } opts.on("-q", "--queues='high medium low'", "Specify queue names in order") { |v| @options[:queues] = v.split(' ') } opts.on("-e", "--environment=development", "Specify which rails environment to run in") { |v| @options[:environment] = v } opts.separator "" opts.separator "Cluster options:" opts.on("-l", "--load-average=8", "Specify what load average to optimize to") { |v| @options[:max_load_average] = v } opts.on("-m", "--min-instances=5", "Specify mimimum number of instances") { |v| @options[:min_instances] = v } opts.on("-x", "--max-instances=200", "Specify maximum number of instances") { |v| @options[:max_instances] = v } opts.on("-a", "--max-adjustment=5", "Specify how many the maximum amount of instances that will be adjusted") { |v| @options[:max_adjustment] = v } opts.on("-s", "--min-queue-size=1000", "Specify how many must be in the queue to adjust instances") { |v| @options[:min_queue_size] = v } opts.separator "" opts.separator "Commands:" opts.separator " start - starts up the poller" opts.separator " stop - stops a poller currently running as a daemon" opts.separator " status - prints the status of the queues" opts.separator "" opts.separator "Examples:" opts.separator " #{$0} start (starts a poller running in the console)" opts.separator " #{$0} -d -e production start (starts a poller running as a daemon with ID #1)" opts.separator " #{$0} --daemon --id=5 start (starts poller with ID #5)" opts.separator " #{$0} --daemon --id=5 stop (stops poller with ID #5)" end.parse! @options[:command] = ARGV.pop unless ARGV.empty? case when @options[:command] == 'start' && !@options[:daemon] then trap_ctrl_c and load_rails_environment and start_processing_loop when @options[:command] == 'start' && @options[:daemon] then trap_term and start_daemon and load_rails_environment and start_processing_loop and remove_pid when @options[:command] == 'stop' then stop_daemon when @options[:command] == 'cluster' then load_rails_environment and refresh_cluster when @options[:command] == 'status' then load_rails_environment and print_status end end |
Instance Method Details
#load_rails_environment ⇒ Object
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/action_event/commands/poller.rb', line 64 def load_rails_environment ENV['ACTION_EVENT_USE_POLLER_DB'] = 'true' ENV["RAILS_ENV"] = @options[:environment] RAILS_ENV.replace(@options[:environment]) log "Loading #{RAILS_ENV} environment..." require "#{RAILS_ROOT}/config/environment" if defined?(NewRelic) NewRelic::Control.instance.instance_eval do @settings['app_name'] = @settings['app_name'] + ' (Poller)' end end true end |
#log(message) ⇒ Object
229 230 231 232 |
# File 'lib/action_event/commands/poller.rb', line 229 def log() $stdout.puts "[#{"#{@options[:id]}:#{Process.pid} " if @options[:daemon]}#{Time.now}] #{}" $stdout.flush end |
#next_iteration ⇒ Object
if we can get a message, process it
197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 |
# File 'lib/action_event/commands/poller.rb', line 197 def next_iteration reload_application if RAILS_ENV == 'development' if = ActionEvent::Message.(@options[:queues]) begin log_text = "#{[:queue_name]}:#{[:event]} (#{[:params].inspect})" log "Processing #{log_text}" "#{[:event]}_event".camelize.constantize.process([:params]) log "Finished processing #{log_text}" rescue Exception => e log "Error processing #{log_text}: #{e} #{e.backtrace.join("\n")}" end return true else # return false if we didn't get a message... makes start_processing_loop sleep(1) return false end rescue Exception => e log "Error getting next message (#{e})" ActionEvent::Message.connection.verify! rescue log("Error verifying DB connection... sleeping 5 seconds. (#{$!})") and sleep(5) return true end |
#pid_filename ⇒ Object
returns the name of the PID file to use for daemons
81 82 83 |
# File 'lib/action_event/commands/poller.rb', line 81 def pid_filename @pid_filename ||= File.join(RAILS_ROOT, "/log/poller.#{@options[:id]}.pid") end |
#print_status ⇒ Object
58 59 60 61 62 |
# File 'lib/action_event/commands/poller.rb', line 58 def print_status ActionEvent::Message.queue_status(@options[:queues]).to_a.sort { |a,b| a.first <=> b.first }.each do |table,| log "#{table}:\t\t#{}" end end |
#refresh_cluster ⇒ Object
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
# File 'lib/action_event/commands/poller.rb', line 127 def refresh_cluster # gather some current stats current_load = `uptime`.split(' ')[-3..-3][0].to_f current_queue = ActionEvent::Message.queue_status(*@options[:queues]).to_a.map(&:last).sum # remove stale pid files current_pids = Dir[File.join(RAILS_ROOT, "log/poller.*.pid")] active_pids, stale_pids = current_pids.partition { |f| (File.read("/proc/#{File.read(f).to_i}/cmdline").include?('poller') rescue false) } stale_pids.each { |f| File.delete(f) } # compute adjustment based on current load average and queue size if active_pids.length > 0 current_instances = active_pids.length needed_instances = ((current_instances*@options[:max_load_average])/current_load).floor if needed_instances > current_instances needed_instances = [needed_instances, current_instances + @options[:max_adjustment]].min elsif needed_instances < current_instances && current_queue > @options[:min_queue_size] needed_instances = [needed_instances, current_instances - @options[:max_adjustment]].max end else current_instances = 0 needed_instances = @options[:min_instances] end needed_instances = @options[:max_instances] if needed_instances > @options[:max_instances] needed_instances = @options[:min_instances] if needed_instances < @options[:min_instances] # remove pids if there's too many or spawn new ones if there's not enough if needed_instances < current_instances active_pids.last(current_instances - needed_instances).each { |pid_file| puts "delete #{pid_file}" } #File.delete(pid_file) } elsif needed_instances > current_instances (needed_instances - current_instances).times do next_id = (1..needed_instances).to_a.find { |i| !File.exists?(File.join(RAILS_ROOT, "log/poller.#{i}.pid")) } puts "start at id #{next_id}" # if fork # # end end end end |
#reload_application ⇒ Object
219 220 221 222 223 224 225 226 227 |
# File 'lib/action_event/commands/poller.rb', line 219 def reload_application ActionController::Routing::Routes.reload ActionController::Base.view_paths.reload! rescue nil ActionView::Helpers::AssetTagHelper::AssetTag::Cache.clear rescue nil ActiveRecord::Base.reset_subclasses ActiveSupport::Dependencies.clear ActiveRecord::Base.clear_reloadable_connections! end |
#remove_pid ⇒ Object
182 183 184 185 186 187 |
# File 'lib/action_event/commands/poller.rb', line 182 def remove_pid if Process.pid == (File.read(pid_filename).to_i rescue nil) log "Cleaning up PID file..." FileUtils.rm(pid_filename) end end |
#should_stop_processing? ⇒ Boolean
170 171 172 |
# File 'lib/action_event/commands/poller.rb', line 170 def should_stop_processing? @stop_processing || (@options[:daemon] && (File.read(pid_filename).to_i rescue 0) != Process.pid) end |
#start_daemon ⇒ Object
forks from the current process and closes out everything
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/action_event/commands/poller.rb', line 86 def start_daemon log "Starting daemon ##{@options[:id]}..." # some process magic exit if fork # Parent exits, child continues. Process.setsid # Become session leader. exit if fork # Zap session leader. Dir.chdir "/" # Release old working directory. File.umask 0000 # Ensure sensible umask. Adjust as needed. # Free file descriptors and point them somewhere sensible. STDIN.reopen "/dev/null" STDOUT.reopen File.join(RAILS_ROOT, "log/poller.log"), "a" STDERR.reopen STDOUT # don't start up until the previous poller is dead while (previous_pid = File.read(pid_filename).to_i rescue nil) do break unless File.exists?("/proc/#{previous_pid}") log "Waiting for previous poller to finish..." Process.kill('TERM', previous_pid) sleep(5) end # record pid File.open(pid_filename, 'w') { |f| f << Process.pid } end |
#start_processing_loop ⇒ Object
loops until should_stop_processing? set to true… in local mode, this is never set so it will loop forever
190 191 192 193 194 |
# File 'lib/action_event/commands/poller.rb', line 190 def start_processing_loop log "Processing queues: #{@options[:queues].join(',')}" next_iteration or sleep(0.5) until should_stop_processing? log "Got signal to stop... exiting." end |
#stop_daemon ⇒ Object
finds the already running daemon and stops it…
175 176 177 178 179 180 |
# File 'lib/action_event/commands/poller.rb', line 175 def stop_daemon if previous_pid = File.read(pid_filename).to_i rescue nil log "Sending stop signal to daemon ##{@options[:id]}..." Process.kill('TERM', previous_pid) end end |
#trap_ctrl_c ⇒ Object
113 114 115 116 117 118 |
# File 'lib/action_event/commands/poller.rb', line 113 def trap_ctrl_c trap("SIGINT") do @stop_processing = true log "Sending stop signal..." end end |
#trap_term ⇒ Object
120 121 122 123 124 125 |
# File 'lib/action_event/commands/poller.rb', line 120 def trap_term trap("SIGTERM") do @stop_processing = true log "Received stop signal..." end end |