Module: Delayed::Master::Worker::ThreadWorker

Included in:
Worker
Defined in:
lib/delayed/master/worker/thread_worker.rb

Instance Method Summary collapse

Instance Method Details

#multithread?Boolean

Returns:

  • (Boolean)


17
18
19
# File 'lib/delayed/master/worker/thread_worker.rb', line 17

def multithread?
  @max_threads.to_i > 1
end

#run_one_job(job) ⇒ Object



57
58
59
# File 'lib/delayed/master/worker/thread_worker.rb', line 57

def run_one_job(job)
  self.class.lifecycle.run_callbacks(:perform, self, job) { run(job) }
end

#work_off(num = 100) ⇒ Object



9
10
11
12
13
14
15
# File 'lib/delayed/master/worker/thread_worker.rb', line 9

def work_off(num = 100)
  if multithread?
    work_off_for_multithread
  else
    super
  end
end

#work_off_for_multithreadObject



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/delayed/master/worker/thread_worker.rb', line 21

def work_off_for_multithread
  success = 0
  failure = 0

  monitor = Monitor.new
  thread_pool = ThreadPool.new(self, @max_threads)

  thread_pool.schedule do
    self.class.lifecycle.run_callbacks(:scheduler_thread, self) do
      if stop?
        next nil
      else
        next reserve_job
      end
    end
  end

  thread_pool.work do |job|
    @master_logger.debug { "start worker thread #{Thread.current.object_id}" }
    self.class.lifecycle.run_callbacks(:worker_thread, self, job) do
      case run_one_job(job)
      when true
        monitor.synchronize { success += 1 }
      when false
        monitor.synchronize { failure += 1 }
      end
    end
    @master_logger.debug { "stop worker thread #{Thread.current.object_id}" }
  end

  thread_pool.wait
  thread_pool.shutdown

  [success, failure]
end