Class: QC::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/queue_classic/worker.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(args = {}) ⇒ Worker

In the case no arguments are passed to the initializer, the defaults are pulled from the environment variables.



11
12
13
14
15
16
# File 'lib/queue_classic/worker.rb', line 11

def initialize(args={})
  @fork_worker = args[:fork_worker] || QC::FORK_WORKER
  @queue = QC::Queue.new((args[:q_name] || QC::QUEUE), args[:top_bound])
  log(args.merge(:at => "worker_initialized"))
  @running = true
end

Instance Attribute Details

#queueObject

Returns the value of attribute queue.



8
9
10
# File 'lib/queue_classic/worker.rb', line 8

def queue
  @queue
end

#runningObject

Returns the value of attribute running.



8
9
10
# File 'lib/queue_classic/worker.rb', line 8

def running
  @running
end

Instance Method Details

#call(job) ⇒ Object

Each job includes a method column. We will use ruby’s eval to grab the ruby object from memory. We send the method to the object and pass the args.



83
84
85
86
87
88
# File 'lib/queue_classic/worker.rb', line 83

def call(job)
  args = job[:args]
  klass = eval(job[:method].split(".").first)
  message = job[:method].split(".").last
  klass.send(message, *args)
end

#fork_and_workObject

This method will tell the ruby process to FORK. Define setup_child to hook into the forking process. Using setup_child is good for re-establishing database connections.



37
38
39
40
41
# File 'lib/queue_classic/worker.rb', line 37

def fork_and_work
  cpid = fork {setup_child; work}
  log(:at => :fork, :pid => cpid)
  Process.wait(cpid)
end

#handle_failure(job, e) ⇒ Object

This method will be called when an exception is raised during the execution of the job.



92
93
94
# File 'lib/queue_classic/worker.rb', line 92

def handle_failure(job,e)
  log(:at => "handle_failure", :job => job, :error => e.inspect)
end

#lock_jobObject

Attempt to lock a job in the queue’s table. Return a hash when a job is locked. Caller responsible for deleting the job when finished.



55
56
57
58
59
60
61
62
63
# File 'lib/queue_classic/worker.rb', line 55

def lock_job
  log(:at => "lock_job")
  job = nil
  while @running
    break if job = @queue.lock
    Conn.wait(@queue.name)
  end
  job
end

#log(data) ⇒ Object



103
104
105
# File 'lib/queue_classic/worker.rb', line 103

def log(data)
  QC.log(data)
end

#process(job) ⇒ Object

A job is processed by evaluating the target code. Errors are delegated to the handle_failure method. Also, this method will make the best attempt to delete the job from the queue before returning.



69
70
71
72
73
74
75
76
77
78
# File 'lib/queue_classic/worker.rb', line 69

def process(job)
  begin
    call(job)
  rescue => e
    handle_failure(job, e)
  ensure
    @queue.delete(job[:id])
    log(:at => "delete_job", :job => job[:id])
  end
end

#setup_childObject

This method should be overriden if your worker is forking and you need to re-establish database connections



99
100
101
# File 'lib/queue_classic/worker.rb', line 99

def setup_child
  log(:at => "setup_child")
end

#startObject

Start a loop and work jobs indefinitely. Call this method to start the worker. This is the easiest way to start working jobs.



21
22
23
24
25
# File 'lib/queue_classic/worker.rb', line 21

def start
  while @running
    @fork_worker ? fork_and_work : work
  end
end

#stopObject

Call this method to stop the worker. The worker may not stop immediately if the worker is sleeping.



30
31
32
# File 'lib/queue_classic/worker.rb', line 30

def stop
  @running = false
end

#workObject

This method will lock a job & process the job.



44
45
46
47
48
49
50
# File 'lib/queue_classic/worker.rb', line 44

def work
  if job = lock_job
    QC.log_yield(:at => "work", :job => job[:id]) do
      process(job)
    end
  end
end