Class: Delayed::Command
- Inherits:
-
Object
- Object
- Delayed::Command
- Defined in:
- lib/delayed/command.rb
Overview
rubocop:disable ClassLength
Instance Attribute Summary collapse
-
#worker_count ⇒ Object
Returns the value of attribute worker_count.
-
#worker_pools ⇒ Object
Returns the value of attribute worker_pools.
Instance Method Summary collapse
-
#daemonize ⇒ Object
rubocop:disable PerceivedComplexity.
-
#initialize(args) ⇒ Command
constructor
rubocop:disable MethodLength.
- #run(worker_name = nil, options = {}) ⇒ Object
- #run_process(process_name, options = {}) ⇒ Object
- #setup_pools ⇒ Object
Constructor Details
#initialize(args) ⇒ Command
rubocop:disable MethodLength
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/delayed/command.rb', line 14 def initialize(args) # rubocop:disable MethodLength @options = { :quiet => true, :pid_dir => "#{Rails.root}/tmp/pids" } @worker_count = 1 @monitor = false opts = OptionParser.new do |opt| opt. = "Usage: #{File.basename($PROGRAM_NAME)} [options] start|stop|restart|run" opt.on('-h', '--help', 'Show this message') do puts opt exit 1 end opt.on('-e', '--environment=NAME', 'Specifies the environment to run this delayed jobs under (test/development/production).') do |_e| STDERR.puts 'The -e/--environment option has been deprecated and has no effect. Use RAILS_ENV and see http://github.com/collectiveidea/delayed_job/issues/#issue/7' end opt.on('--min-priority N', 'Minimum priority of jobs to run.') do |n| @options[:min_priority] = n end opt.on('--max-priority N', 'Maximum priority of jobs to run.') do |n| @options[:max_priority] = n end opt.on('-n', '--number_of_workers=workers', 'Number of unique workers to spawn') do |worker_count| @worker_count = worker_count.to_i rescue 1 # rubocop:disable RescueModifier end opt.on('--pid-dir=DIR', 'Specifies an alternate directory in which to store the process ids.') do |dir| @options[:pid_dir] = dir end opt.on('-i', '--identifier=n', 'A numeric identifier for the worker.') do |n| @options[:identifier] = n end opt.on('-m', '--monitor', 'Start monitor process.') do @monitor = true end opt.on('--sleep-delay N', 'Amount of time to sleep when no jobs are found') do |n| @options[:sleep_delay] = n.to_i end opt.on('--read-ahead N', 'Number of jobs from the queue to consider') do |n| @options[:read_ahead] = n end opt.on('-p', '--prefix NAME', 'String to be prefixed to worker process names') do |prefix| @options[:prefix] = prefix end opt.on('--queues=queues', 'Specify which queue DJ must look up for jobs') do |queues| @options[:queues] = queues.split(',') end opt.on('--queue=queue', 'Specify which queue DJ must look up for jobs') do |queue| @options[:queues] = queue.split(',') end opt.on('--pool=queue1[,queue2][:worker_count]', 'Specify queues and number of workers for a worker pool') do |pool| parse_worker_pool(pool) end opt.on('--exit-on-complete', 'Exit when no more jobs are available to run. This will exit if all jobs are scheduled to run in the future.') do @options[:exit_on_complete] = true end end @args = opts.parse!(args) end |
Instance Attribute Details
#worker_count ⇒ Object
Returns the value of attribute worker_count.
12 13 14 |
# File 'lib/delayed/command.rb', line 12 def worker_count @worker_count end |
#worker_pools ⇒ Object
Returns the value of attribute worker_pools.
12 13 14 |
# File 'lib/delayed/command.rb', line 12 def worker_pools @worker_pools end |
Instance Method Details
#daemonize ⇒ Object
rubocop:disable PerceivedComplexity
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/delayed/command.rb', line 76 def daemonize # rubocop:disable PerceivedComplexity dir = @options[:pid_dir] Dir.mkdir(dir) unless File.exist?(dir) if worker_pools setup_pools elsif @options[:identifier] if worker_count > 1 raise ArgumentError, 'Cannot specify both --number-of-workers and --identifier' else run_process("delayed_job.#{@options[:identifier]}", @options) end else worker_count.times do |worker_index| process_name = worker_count == 1 ? 'delayed_job' : "delayed_job.#{worker_index}" run_process(process_name, @options) end end end |
#run(worker_name = nil, options = {}) ⇒ Object
116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/delayed/command.rb', line 116 def run(worker_name = nil, = {}) Dir.chdir(Rails.root) Delayed::Worker.after_fork Delayed::Worker.logger ||= Logger.new(File.join(Rails.root, 'log', 'delayed_job.log')) worker = Delayed::Worker.new() worker.name_prefix = "#{worker_name} " worker.start rescue => e Rails.logger.fatal e STDERR.puts e. exit 1 end |
#run_process(process_name, options = {}) ⇒ Object
108 109 110 111 112 113 114 |
# File 'lib/delayed/command.rb', line 108 def run_process(process_name, = {}) Delayed::Worker.before_fork Daemons.run_proc(process_name, :dir => [:pid_dir], :dir_mode => :normal, :monitor => @monitor, :ARGV => @args) do |*_args| $0 = File.join([:prefix], process_name) if @options[:prefix] run process_name, end end |
#setup_pools ⇒ Object
96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/delayed/command.rb', line 96 def setup_pools worker_index = 0 @worker_pools.each do |queues, worker_count| = @options.merge(:queues => queues) worker_count.times do process_name = "delayed_job.#{worker_index}" run_process(process_name, ) worker_index += 1 end end end |