Class: QC::Worker
- Inherits:
-
Object
- Object
- QC::Worker
- Defined in:
- lib/queue_classic/worker.rb
Instance Attribute Summary collapse
-
#queue ⇒ Object
Returns the value of attribute queue.
-
#running ⇒ Object
Returns the value of attribute running.
Instance Method Summary collapse
-
#call(job) ⇒ Object
Each job includes a method column.
-
#fork_and_work ⇒ Object
This method will tell the ruby process to FORK.
-
#handle_failure(job, e) ⇒ Object
This method will be called when an exception is raised during the execution of the job.
-
#initialize(args = {}) ⇒ Worker
constructor
In the case no arguments are passed to the initializer, the defaults are pulled from the environment variables.
-
#lock_job ⇒ Object
Attempt to lock a job in the queue’s table.
- #log(data) ⇒ Object
-
#process(job) ⇒ Object
A job is processed by evaluating the target code.
-
#setup_child ⇒ Object
This method should be overriden if your worker is forking and you need to re-establish database connections.
-
#start ⇒ Object
Start a loop and work jobs indefinitely.
-
#stop ⇒ Object
Call this method to stop the worker.
-
#work ⇒ Object
This method will lock a job & process the job.
Constructor Details
#initialize(args = {}) ⇒ Worker
In the case no arguments are passed to the initializer, the defaults are pulled from the environment variables.
11 12 13 14 15 16 |
# File 'lib/queue_classic/worker.rb', line 11 def initialize(args={}) @fork_worker = args[:fork_worker] || QC::FORK_WORKER @queue = QC::Queue.new((args[:q_name] || QC::QUEUE), args[:top_bound]) log(args.merge(:at => "worker_initialized")) @running = true end |
Instance Attribute Details
#queue ⇒ Object
Returns the value of attribute queue.
8 9 10 |
# File 'lib/queue_classic/worker.rb', line 8 def queue @queue end |
#running ⇒ Object
Returns the value of attribute running.
8 9 10 |
# File 'lib/queue_classic/worker.rb', line 8 def running @running end |
Instance Method Details
#call(job) ⇒ Object
Each job includes a method column. We will use ruby’s eval to grab the ruby object from memory. We send the method to the object and pass the args.
83 84 85 86 87 88 |
# File 'lib/queue_classic/worker.rb', line 83 def call(job) args = job[:args] klass = eval(job[:method].split(".").first) = job[:method].split(".").last klass.send(, *args) end |
#fork_and_work ⇒ Object
This method will tell the ruby process to FORK. Define setup_child to hook into the forking process. Using setup_child is good for re-establishing database connections.
37 38 39 40 41 |
# File 'lib/queue_classic/worker.rb', line 37 def fork_and_work cpid = fork {setup_child; work} log(:at => :fork, :pid => cpid) Process.wait(cpid) end |
#handle_failure(job, e) ⇒ Object
This method will be called when an exception is raised during the execution of the job.
92 93 94 |
# File 'lib/queue_classic/worker.rb', line 92 def handle_failure(job,e) log(:at => "handle_failure", :job => job, :error => e.inspect) end |
#lock_job ⇒ Object
Attempt to lock a job in the queue’s table. Return a hash when a job is locked. Caller responsible for deleting the job when finished.
55 56 57 58 59 60 61 62 63 |
# File 'lib/queue_classic/worker.rb', line 55 def lock_job log(:at => "lock_job") job = nil while @running break if job = @queue.lock Conn.wait(@queue.name) end job end |
#log(data) ⇒ Object
103 104 105 |
# File 'lib/queue_classic/worker.rb', line 103 def log(data) QC.log(data) end |
#process(job) ⇒ Object
A job is processed by evaluating the target code. Errors are delegated to the handle_failure method. Also, this method will make the best attempt to delete the job from the queue before returning.
69 70 71 72 73 74 75 76 77 78 |
# File 'lib/queue_classic/worker.rb', line 69 def process(job) begin call(job) rescue => e handle_failure(job, e) ensure @queue.delete(job[:id]) log(:at => "delete_job", :job => job[:id]) end end |
#setup_child ⇒ Object
This method should be overriden if your worker is forking and you need to re-establish database connections
99 100 101 |
# File 'lib/queue_classic/worker.rb', line 99 def setup_child log(:at => "setup_child") end |
#start ⇒ Object
Start a loop and work jobs indefinitely. Call this method to start the worker. This is the easiest way to start working jobs.
21 22 23 24 25 |
# File 'lib/queue_classic/worker.rb', line 21 def start while @running @fork_worker ? fork_and_work : work end end |
#stop ⇒ Object
Call this method to stop the worker. The worker may not stop immediately if the worker is sleeping.
30 31 32 |
# File 'lib/queue_classic/worker.rb', line 30 def stop @running = false end |