Class: Cyclop::Worker
- Inherits:
-
Object
- Object
- Cyclop::Worker
- Defined in:
- lib/cyclop/worker.rb
Instance Attribute Summary collapse
-
#actions ⇒ Object
Path to actions directory.
-
#die_after ⇒ Object
Number of jobs to process before exiting.
-
#job_opts ⇒ Object
Options passed to Cyclop.next to get next job.
-
#logger ⇒ Object
Logger for master.
-
#processed_jobs ⇒ Object
Number of jobs processed by this worker.
-
#queues ⇒ Object
Queues to process.
-
#sleep_interval ⇒ Object
How much time to sleep between poll.
Instance Method Summary collapse
-
#initialize(config = {}) ⇒ Worker
constructor
A new instance of Worker.
-
#perform(job) ⇒ Object
Called inside forked process.
-
#run ⇒ Object
Start processing jobs.
-
#stop ⇒ Object
Gracefull shutdown.
-
#stop! ⇒ Object
Forced shutdown.
Constructor Details
#initialize(config = {}) ⇒ Worker
Returns a new instance of Worker.
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/cyclop/worker.rb', line 18 def initialize(config={}) raise ArgumentError, 'mongo["database"] is required' unless config["mongo"] && config["mongo"]["database"] self.queues = config["queues"] || [] self.logger = Logger.new(config["log_file"] || $stdout) self.sleep_interval = config["sleep_interval"] || 1 self.actions = config["actions"] || "./actions" self.processed_jobs = 0 self.die_after = config["die_after"] @job_opts = {} if config["limit_to_host"] @job_opts[:host] = config["limit_to_host"] @job_opts[:host] = Cyclop.host if @job_opts[:host]=="localhost" end connection = if config["mongo"]["hosts"] Mongo::ReplSetConnection.new( *config["mongo"]["hosts"], rs_name: config["mongo"]["rs_name"], read_secondary: !!config["mongo"]["read_secondary"], logger: (logger if config["mongo"]["log"]), ) else Mongo::Connection.new( (config["mongo"]["host"] || "127.0.0.1"), (config["mongo"]["port"] || 27017), logger: (logger if config["mongo"]["log"]), ) end Cyclop.db = connection.db config["mongo"]["database"] end |
Instance Attribute Details
#actions ⇒ Object
Path to actions directory
10 11 12 |
# File 'lib/cyclop/worker.rb', line 10 def actions @actions end |
#die_after ⇒ Object
Number of jobs to process before exiting
12 13 14 |
# File 'lib/cyclop/worker.rb', line 12 def die_after @die_after end |
#job_opts ⇒ Object
Options passed to Cyclop.next to get next job
16 17 18 |
# File 'lib/cyclop/worker.rb', line 16 def job_opts @job_opts end |
#logger ⇒ Object
Logger for master
6 7 8 |
# File 'lib/cyclop/worker.rb', line 6 def logger @logger end |
#processed_jobs ⇒ Object
Number of jobs processed by this worker
14 15 16 |
# File 'lib/cyclop/worker.rb', line 14 def processed_jobs @processed_jobs end |
#queues ⇒ Object
Queues to process
4 5 6 |
# File 'lib/cyclop/worker.rb', line 4 def queues @queues end |
#sleep_interval ⇒ Object
How much time to sleep between poll
8 9 10 |
# File 'lib/cyclop/worker.rb', line 8 def sleep_interval @sleep_interval end |
Instance Method Details
#perform(job) ⇒ Object
Called inside forked process
Parameters:
-
(Cyclop::Job) job - the job to process
89 90 91 92 93 94 95 96 97 |
# File 'lib/cyclop/worker.rb', line 89 def perform(job) load_actions Cyclop::Action.find_by_queue(job.queue).perform(*job.job_params) 0 rescue Exception => e log e.to_s job.release! e 1 end |
#run ⇒ Object
Start processing jobs
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/cyclop/worker.rb', line 50 def run register_signal_handlers loop do if stop? log "Shutting down..." break end if job = next_job @sleeping = false if @pid = fork msg = "Forked process #{@pid} to work on job #{job.queue}-#{job._id}..." log msg procline msg Process.wait log "Child process #{@pid} ended with status: #{$?}" self.processed_jobs += 1 if $?.exitstatus==0 job.complete! else job.release! end else procline "Processing #{job.queue}-#{job._id} (started at #{Time.now.utc})" exit! perform job end else log "No more job to process, start sleeping..." unless @sleeping @sleeping = true sleep sleep_interval end end end |
#stop ⇒ Object
Gracefull shutdown
100 101 102 |
# File 'lib/cyclop/worker.rb', line 100 def stop @stop = true end |
#stop! ⇒ Object
Forced shutdown
105 106 107 108 109 110 111 |
# File 'lib/cyclop/worker.rb', line 105 def stop! if @pid Process.kill "TERM", @pid Process.wait end exit! end |