Class: QC::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/queue_classic/worker.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(args = {}) ⇒ Worker

In the case no arguments are passed to the initializer, the defaults are pulled from the environment variables.



7
8
9
10
11
12
13
14
15
16
17
# File 'lib/queue_classic/worker.rb', line 7

def initialize(args={})
  @q_name           = args[:q_name]           ||= QC::QUEUE
  @top_bound        = args[:top_bound]        ||= QC::TOP_BOUND
  @fork_worker      = args[:fork_worker]      ||= QC::FORK_WORKER
  @listening_worker = args[:listening_worker] ||= QC::LISTENING_WORKER
  @max_attempts     = args[:max_attempts]     ||= QC::MAX_LOCK_ATTEMPTS

  @running = true
  @queue = Queue.new(@q_name, @listening_worker)
  log(args.merge(:at => "worker_initialized"))
end

Instance Attribute Details

#queueObject (readonly)

Returns the value of attribute queue.



4
5
6
# File 'lib/queue_classic/worker.rb', line 4

def queue
  @queue
end

Instance Method Details

#call(job) ⇒ Object

Each job includes a method column. We will use ruby’s eval to grab the ruby object from memory. We send the method to the object and pass the args.



96
97
98
99
100
101
# File 'lib/queue_classic/worker.rb', line 96

def call(job)
  args = job[:args]
  klass = eval(job[:method].split(".").first)
  message = job[:method].split(".").last
  klass.send(message, *args)
end

#fork_and_workObject

This method will tell the ruby process to FORK. Define setup_child to hook into the forking process. Using setup_child is good for re-establishing database connections.



38
39
40
41
42
# File 'lib/queue_classic/worker.rb', line 38

def fork_and_work
  @cpid = fork {setup_child; work}
  log(:at => :fork, :pid => @cpid)
  Process.wait(@cpid)
end

#handle_failure(job, e) ⇒ Object

This method will be called when an exception is raised during the execution of the job.



123
124
125
# File 'lib/queue_classic/worker.rb', line 123

def handle_failure(job,e)
  log(:at => "handle_failure")
end

#lock_jobObject

lock_job will attempt to lock a job in the queue’s table. It uses an exponential backoff in the event that a job was not locked. This method will return a hash when a job is obtained.

This method will terminate early if the stop method is called or It is important that callers delete the job when finished. *@queue.delete(job)*



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/queue_classic/worker.rb', line 71

def lock_job
  log(:at => "lock_job")
  attempts = 0
  job = nil
  until !@running || job
    job = @queue.lock(@top_bound)
    if job.nil?
      log(:at => "failed_lock", :attempts => attempts)
      if attempts < @max_attempts
        wait(2**attempts)
        attempts += 1
        next
      else
        break
      end
    else
      log(:at => "finished_lock", :job => job[:id])
    end
  end
  job
end

#log(data) ⇒ Object



134
135
136
# File 'lib/queue_classic/worker.rb', line 134

def log(data)
  QC.log(data)
end

#setup_childObject

This method should be overriden if your worker is forking and you need to re-establish database connections



130
131
132
# File 'lib/queue_classic/worker.rb', line 130

def setup_child
  log(:at => "setup_child")
end

#startObject

Start a loop and work jobs indefinitely. Call this method to start the worker. This is the easiest way to start working jobs.



22
23
24
25
26
# File 'lib/queue_classic/worker.rb', line 22

def start
  while @running
    @fork_worker ? fork_and_work : work
  end
end

#stopObject

Call this method to stop the worker. The worker may not stop immediately if the worker is sleeping.



31
32
33
# File 'lib/queue_classic/worker.rb', line 31

def stop
  @running = false
end

#wait(t) ⇒ Object

If @listening_worker is set, the worker will use the database to sleep. The database approach preferred over a syscall since the database will break the sleep when new jobs are inserted into the queue.



107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/queue_classic/worker.rb', line 107

def wait(t)
  if @listening_worker
    log(:at => "listen_wait", :wait => t)
    Conn.listen(@queue.chan)
    Conn.wait_for_notify(t)
    Conn.unlisten(@queue.chan)
    Conn.drain_notify
    log(:at => "finished_listening")
  else
    log(:at => "sleep_wait", :wait => t)
    Kernel.sleep(t)
  end
end

#workObject

This method will lock a job & evaluate the code defined by the job. Also, this method will make the best attempt to delete the job from the queue before returning.



47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/queue_classic/worker.rb', line 47

def work
  if job = lock_job
    QC.log_yield(:at => "work", :job => job[:id]) do
      begin
        call(job)
      rescue => e
        handle_failure(job, e)
      ensure
        @queue.delete(job[:id])
        log(:at => "delete_job", :job => job[:id])
      end
    end
  end
end