Class: ThreadJob::ThreadPool
- Inherits:
-
Object
- Object
- ThreadJob::ThreadPool
- Defined in:
- lib/thread_job/thread_pool.rb
Instance Method Summary collapse
- #add_workers(num_workers) ⇒ Object
- #has_available_thread? ⇒ Boolean
-
#initialize(max_size = 5, logger = Logger.new(STDOUT)) ⇒ ThreadPool
constructor
A new instance of ThreadPool.
- #kill ⇒ Object
- #monitor_queue ⇒ Object
- #run(job_hash) ⇒ Object
Constructor Details
#initialize(max_size = 5, logger = Logger.new(STDOUT)) ⇒ ThreadPool
Returns a new instance of ThreadPool.
6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
# File 'lib/thread_job/thread_pool.rb', line 6 def initialize(max_size=5, logger=Logger.new(STDOUT)) @queue = Queue.new @logger = logger @avail_pool = max_size.times.map do Thread.new do @logger.debug("[ThreadPool] started thread #{Thread.current}") while true monitor_queue end end end @use_pool = [] @mutex = Mutex.new end |
Instance Method Details
#add_workers(num_workers) ⇒ Object
28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/thread_job/thread_pool.rb', line 28 def add_workers(num_workers) num_workers.times do thread = Thread.new do @logger.debug("[ThreadPool] started thread #{Thread.current}") while true monitor_queue end end @avail_pool.push(thread) end end |
#has_available_thread? ⇒ Boolean
21 22 23 24 25 26 |
# File 'lib/thread_job/thread_pool.rb', line 21 def has_available_thread? @mutex.synchronize { @logger.debug("[ThreadPool] #{@avail_pool.length} threads available, #{@use_pool.length} threads in use") return @avail_pool.length > 0 } end |
#kill ⇒ Object
40 41 42 43 44 45 46 47 48 |
# File 'lib/thread_job/thread_pool.rb', line 40 def kill @avail_pool.each do |avail_thread| avail_thread.kill end @use_pool.each do |used_thread| used_thread.kill end end |
#monitor_queue ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/thread_job/thread_pool.rb', line 50 def monitor_queue work = @queue.pop if work @mutex.synchronize { @use_pool.push(Thread.current) @avail_pool.delete(Thread.current) } @logger.debug("[ThreadPool] Running job '#{work[:job_name]}' on thread #{Thread.current}") begin work[:job].run rescue => e @logger.error("[ThreadPool] Worker thread #{Thread.current} encountered an error #{e} while processing job '#{work[:job_name]}'") work[:job_store].fail_job(work[:queue_name], work[:id]) @mutex.synchronize { @avail_pool.push(Thread.current) @use_pool.delete(Thread.current) } return end work[:job_store].complete_job(work[:queue_name], work[:id]) @mutex.synchronize { @avail_pool.push(Thread.current) @use_pool.delete(Thread.current) } end end |
#run(job_hash) ⇒ Object
79 80 81 |
# File 'lib/thread_job/thread_pool.rb', line 79 def run(job_hash) @queue.push(job_hash) end |