Class: QC::Worker
- Inherits:
-
Object
- Object
- QC::Worker
- Defined in:
- lib/queue_classic/worker.rb
Overview
A Worker object can process jobs from one or many queues.
Instance Attribute Summary collapse
-
#queues ⇒ Object
Returns the value of attribute queues.
-
#running ⇒ Object
Returns the value of attribute running.
Instance Method Summary collapse
-
#call(job) ⇒ Object
Each job includes a method column.
-
#fork_and_work ⇒ Object
Calls Worker#work but after the current process is forked.
-
#handle_failure(job, e) ⇒ Object
This method will be called when a StandardError, ScriptError or NoMemoryError is raised during the execution of the job.
- #handle_success(queue, job) ⇒ Object
-
#initialize(args = {}) ⇒ Worker
constructor
Creates a new worker but does not start the worker.
-
#lock_job ⇒ Object
Attempt to lock a job in the queue’s table.
- #log(data) ⇒ Object
-
#process(queue, job) ⇒ Object
A job is processed by evaluating the target code.
-
#setup_child ⇒ Object
This method should be overriden if your worker is forking and you need to re-establish database connections.
-
#start ⇒ Object
Commences the working of jobs.
-
#stop ⇒ Object
Signals the worker to stop taking new work.
-
#work ⇒ Object
Blocks on locking a job, and once a job is locked, it will process the job.
Constructor Details
#initialize(args = {}) ⇒ Worker
Creates a new worker but does not start the worker. See Worker#start. This method takes a single hash argument. The following keys are read:
- fork_worker
-
Worker forks each job execution.
- wait_interval
-
Time to wait between failed lock attempts
- connection
-
PG::Connection object.
- q_name
-
Name of a single queue to process.
- q_names
-
Names of queues to process. Will process left to right.
- top_bound
-
Offset to the head of the queue. 1 == strict FIFO.
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/queue_classic/worker.rb', line 21 def initialize(args={}) @fork_worker = args[:fork_worker] || QC.fork_worker? @wait_interval = args[:wait_interval] || QC.wait_time if args[:connection] @conn_adapter = ConnAdapter.new(connection: args[:connection]) else @conn_adapter = QC.default_conn_adapter end @queues = setup_queues(@conn_adapter, (args[:q_name] || QC.queue), (args[:q_names] || QC.queues), (args[:top_bound] || QC.top_bound)) log(args.merge(:at => "worker_initialized")) @running = true end |
Instance Attribute Details
#queues ⇒ Object
Returns the value of attribute queues.
11 12 13 |
# File 'lib/queue_classic/worker.rb', line 11 def queues @queues end |
#running ⇒ Object
Returns the value of attribute running.
11 12 13 |
# File 'lib/queue_classic/worker.rb', line 11 def running @running end |
Instance Method Details
#call(job) ⇒ Object
Each job includes a method column. We will use ruby’s eval to grab the ruby object from memory. We send the method to the object and pass the args.
135 136 137 138 139 140 |
# File 'lib/queue_classic/worker.rb', line 135 def call(job) args = job[:args] receiver_str, _, = job[:method].rpartition('.') receiver = eval(receiver_str) receiver.send(, *args) end |
#fork_and_work ⇒ Object
Calls Worker#work but after the current process is forked. The parent process will wait on the child process to exit.
65 66 67 68 69 |
# File 'lib/queue_classic/worker.rb', line 65 def fork_and_work cpid = fork {setup_child; work} log(:at => :fork, :pid => cpid) Process.wait(cpid) end |
#handle_failure(job, e) ⇒ Object
This method will be called when a StandardError, ScriptError or NoMemoryError is raised during the execution of the job.
148 149 150 |
# File 'lib/queue_classic/worker.rb', line 148 def handle_failure(job,e) $stderr.puts("count#qc.job-error=1 job=#{job} error=#{e.inspect} at=#{e.backtrace.first}") end |
#handle_success(queue, job) ⇒ Object
142 143 144 |
# File 'lib/queue_classic/worker.rb', line 142 def handle_success(queue, job) queue.delete(job[:id]) end |
#lock_job ⇒ Object
Attempt to lock a job in the queue’s table. If a job can be locked, this method returns an array with 2 elements. The first element is the queue from which the job was locked and the second is a hash representation of the job. If a job is returned, its locked_at column has been set in the job’s row. It is the caller’s responsibility to delete the job row from the table when the job is complete.
89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/queue_classic/worker.rb', line 89 def lock_job log(:at => "lock_job") job = nil while @running @queues.each do |queue| if job = queue.lock return [queue, job] end end @conn_adapter.wait(@wait_interval, *@queues.map {|q| q.name}) end end |
#log(data) ⇒ Object
159 160 161 |
# File 'lib/queue_classic/worker.rb', line 159 def log(data) QC.log(data) end |
#process(queue, job) ⇒ Object
A job is processed by evaluating the target code. if the job is evaluated with no exceptions then it is deleted from the queue. If the job has raised an exception the responsibility of what to do with the job is delegated to Worker#handle_failure. If the job is not finished and an INT signal is trapped, this method will unlock the job in the queue.
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/queue_classic/worker.rb', line 109 def process(queue, job) start = Time.now finished = false begin call(job).tap do handle_success(queue, job) finished = true end rescue StandardError, ScriptError, NoMemoryError => e # We really only want to unlock the job for signal and system exit # exceptions. If we encounter a ScriptError or a NoMemoryError any # future run will likely encounter the same error. handle_failure(job, e) finished = true ensure if !finished queue.unlock(job[:id]) end ttp = Integer((Time.now - start) * 1000) QC.measure("time-to-process=#{ttp} source=#{queue.name}") end end |
#setup_child ⇒ Object
This method should be overriden if your worker is forking and you need to re-establish database connections
155 156 157 |
# File 'lib/queue_classic/worker.rb', line 155 def setup_child log(:at => "setup_child") end |
#start ⇒ Object
Commences the working of jobs. start() spins on @running –which is initialized as true. This method is the primary entry point to starting the worker. The canonical example of starting a worker is as follows: QC::Worker.new.start
44 45 46 47 48 49 50 |
# File 'lib/queue_classic/worker.rb', line 44 def start QC.unlock_jobs_of_dead_workers while @running @fork_worker ? fork_and_work : work end end |
#stop ⇒ Object
Signals the worker to stop taking new work. This method has no immediate effect. However, there are two loops in the worker (one in #start and another in #lock_job) which check the @running variable to determine if further progress is desirable. In the case that @running is false, the aforementioned methods will short circuit and cause the blocking call to #start to unblock.
59 60 61 |
# File 'lib/queue_classic/worker.rb', line 59 def stop @running = false end |