Class: QC::Worker
- Inherits:
-
Object
- Object
- QC::Worker
- Defined in:
- lib/queue_classic/worker.rb
Instance Attribute Summary collapse
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
Instance Method Summary collapse
-
#call(job) ⇒ Object
Each job includes a method column.
-
#fork_and_work ⇒ Object
This method will tell the ruby process to FORK.
-
#handle_failure(job, e) ⇒ Object
This method will be called when an exception is raised during the execution of the job.
-
#initialize(args = {}) ⇒ Worker
constructor
In the case no arguments are passed to the initializer, the defaults are pulled from the environment variables.
-
#lock_job ⇒ Object
lock_job will attempt to lock a job in the queue’s table.
- #log(data) ⇒ Object
-
#setup_child ⇒ Object
This method should be overriden if your worker is forking and you need to re-establish database connections.
-
#start ⇒ Object
Start a loop and work jobs indefinitely.
-
#stop ⇒ Object
Call this method to stop the worker.
-
#wait(t) ⇒ Object
If @listening_worker is set, the worker will use the database to sleep.
-
#work ⇒ Object
This method will lock a job & evaluate the code defined by the job.
Constructor Details
#initialize(args = {}) ⇒ Worker
In the case no arguments are passed to the initializer, the defaults are pulled from the environment variables.
7 8 9 10 11 12 13 14 15 16 17 |
# File 'lib/queue_classic/worker.rb', line 7 def initialize(args={}) @q_name = args[:q_name] ||= QC::QUEUE @top_bound = args[:top_bound] ||= QC::TOP_BOUND @fork_worker = args[:fork_worker] ||= QC::FORK_WORKER @listening_worker = args[:listening_worker] ||= QC::LISTENING_WORKER @max_attempts = args[:max_attempts] ||= QC::MAX_LOCK_ATTEMPTS @running = true @queue = Queue.new(@q_name, @listening_worker) log(args.merge(:at => "worker_initialized")) end |
Instance Attribute Details
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
4 5 6 |
# File 'lib/queue_classic/worker.rb', line 4 def queue @queue end |
Instance Method Details
#call(job) ⇒ Object
Each job includes a method column. We will use ruby’s eval to grab the ruby object from memory. We send the method to the object and pass the args.
96 97 98 99 100 101 |
# File 'lib/queue_classic/worker.rb', line 96 def call(job) args = job[:args] klass = eval(job[:method].split(".").first) = job[:method].split(".").last klass.send(, *args) end |
#fork_and_work ⇒ Object
This method will tell the ruby process to FORK. Define setup_child to hook into the forking process. Using setup_child is good for re-establishing database connections.
38 39 40 41 42 |
# File 'lib/queue_classic/worker.rb', line 38 def fork_and_work @cpid = fork {setup_child; work} log(:at => :fork, :pid => @cpid) Process.wait(@cpid) end |
#handle_failure(job, e) ⇒ Object
This method will be called when an exception is raised during the execution of the job.
123 124 125 |
# File 'lib/queue_classic/worker.rb', line 123 def handle_failure(job,e) log(:at => "handle_failure") end |
#lock_job ⇒ Object
lock_job will attempt to lock a job in the queue’s table. It uses an exponential backoff in the event that a job was not locked. This method will return a hash when a job is obtained.
This method will terminate early if the stop method is called or It is important that callers delete the job when finished. *@queue.delete(job)*
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/queue_classic/worker.rb', line 71 def lock_job log(:at => "lock_job") attempts = 0 job = nil until !@running || job job = @queue.lock(@top_bound) if job.nil? log(:at => "failed_lock", :attempts => attempts) if attempts < @max_attempts wait(2**attempts) attempts += 1 next else break end else log(:at => "finished_lock", :job => job[:id]) end end job end |
#log(data) ⇒ Object
134 135 136 |
# File 'lib/queue_classic/worker.rb', line 134 def log(data) QC.log(data) end |
#setup_child ⇒ Object
This method should be overriden if your worker is forking and you need to re-establish database connections
130 131 132 |
# File 'lib/queue_classic/worker.rb', line 130 def setup_child log(:at => "setup_child") end |
#start ⇒ Object
Start a loop and work jobs indefinitely. Call this method to start the worker. This is the easiest way to start working jobs.
22 23 24 25 26 |
# File 'lib/queue_classic/worker.rb', line 22 def start while @running @fork_worker ? fork_and_work : work end end |
#stop ⇒ Object
Call this method to stop the worker. The worker may not stop immediately if the worker is sleeping.
31 32 33 |
# File 'lib/queue_classic/worker.rb', line 31 def stop @running = false end |
#wait(t) ⇒ Object
If @listening_worker is set, the worker will use the database to sleep. The database approach preferred over a syscall since the database will break the sleep when new jobs are inserted into the queue.
107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/queue_classic/worker.rb', line 107 def wait(t) if @listening_worker log(:at => "listen_wait", :wait => t) Conn.listen(@queue.chan) Conn.wait_for_notify(t) Conn.unlisten(@queue.chan) Conn.drain_notify log(:at => "finished_listening") else log(:at => "sleep_wait", :wait => t) Kernel.sleep(t) end end |
#work ⇒ Object
This method will lock a job & evaluate the code defined by the job. Also, this method will make the best attempt to delete the job from the queue before returning.
47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/queue_classic/worker.rb', line 47 def work if job = lock_job QC.log_yield(:at => "work", :job => job[:id]) do begin call(job) rescue => e handle_failure(job, e) ensure @queue.delete(job[:id]) log(:at => "delete_job", :job => job[:id]) end end end end |