Class: ThreadJob::ThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/thread_job/thread_pool.rb

Instance Method Summary collapse

Constructor Details

#initialize(max_size = 5, logger = Logger.new(STDOUT)) ⇒ ThreadPool

Returns a new instance of ThreadPool.



6
7
8
9
10
11
12
13
14
15
16
17
18
19
# File 'lib/thread_job/thread_pool.rb', line 6

def initialize(max_size=5, logger=Logger.new(STDOUT))
  @queue = Queue.new
  @logger = logger
  @avail_pool = max_size.times.map do
    Thread.new do
      @logger.debug("[ThreadPool] started thread #{Thread.current}")
      while true
        monitor_queue
      end
    end
  end
  @use_pool = []
  @mutex = Mutex.new
end

Instance Method Details

#add_workers(num_workers) ⇒ Object



28
29
30
31
32
33
34
35
36
37
38
# File 'lib/thread_job/thread_pool.rb', line 28

def add_workers(num_workers)
  num_workers.times do
    thread = Thread.new do
      @logger.debug("[ThreadPool] started thread #{Thread.current}")
      while true
        monitor_queue
      end
    end
    @avail_pool.push(thread)
  end
end

#has_available_thread?Boolean

Returns:

  • (Boolean)


21
22
23
24
25
26
# File 'lib/thread_job/thread_pool.rb', line 21

def has_available_thread?
  @mutex.synchronize {
    @logger.debug("[ThreadPool] #{@avail_pool.length} threads available, #{@use_pool.length} threads in use")
    return @avail_pool.length > 0
  }
end

#killObject



40
41
42
43
44
45
46
47
48
# File 'lib/thread_job/thread_pool.rb', line 40

def kill
  @avail_pool.each do |avail_thread|
    avail_thread.kill
  end

  @use_pool.each do |used_thread|
    used_thread.kill
  end
end

#monitor_queueObject



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/thread_job/thread_pool.rb', line 50

def monitor_queue
  work = @queue.pop
  if work
    @mutex.synchronize {
      @use_pool.push(Thread.current)
      @avail_pool.delete(Thread.current)
    }

    @logger.debug("[ThreadPool] Running job '#{work[:job_name]}' on thread #{Thread.current}")
    begin
      work[:job].run
    rescue => e
      @logger.error("[ThreadPool] Worker thread #{Thread.current} encountered an error #{e} while processing job '#{work[:job_name]}'")
      work[:job_store].fail_job(work[:queue_name], work[:id])
      @mutex.synchronize {
        @avail_pool.push(Thread.current)
        @use_pool.delete(Thread.current)
      }
      return
    end
    work[:job_store].complete_job(work[:queue_name], work[:id])

    @mutex.synchronize {
      @avail_pool.push(Thread.current)
      @use_pool.delete(Thread.current)
    }
  end
end

#run(job_hash) ⇒ Object



79
80
81
# File 'lib/thread_job/thread_pool.rb', line 79

def run(job_hash)
  @queue.push(job_hash)
end