Class: Workling::Invokers::ThreadPoolPoller
- Defined in:
- lib/workling/invokers/thread_pool_poller.rb
Instance Attribute Summary collapse
-
#pool_capacity ⇒ Object
readonly
Returns the value of attribute pool_capacity.
-
#reset_time ⇒ Object
readonly
Returns the value of attribute reset_time.
-
#sleep_time ⇒ Object
readonly
Returns the value of attribute sleep_time.
Instance Method Summary collapse
-
#initialize(routing, client_class) ⇒ ThreadPoolPoller
constructor
A new instance of ThreadPoolPoller.
-
#listen ⇒ Object
Start up the checking for items on the queue.
-
#poller_threads ⇒ Object
Number of currently active polling threads.
-
#stop ⇒ Object
Instructs the thread pool poller to stop checking for new jobs on the backing queue.
-
#worker_threads ⇒ Object
Number of correctly active worker threads.
-
#workers_available? ⇒ Boolean
Set pool_size in workling config to adjust the maximum number of threads in the pool.
Methods inherited from Base
Constructor Details
#initialize(routing, client_class) ⇒ ThreadPoolPoller
Returns a new instance of ThreadPoolPoller.
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/workling/invokers/thread_pool_poller.rb', line 32 def initialize(routing, client_class) @routing = routing @client_class = client_class # Grab settings out of the config file @sleep_time = (Workling.config[:sleep_time] || 2).to_f @reset_time = (Workling.config[:reset_time] || 30).to_f # Pool of polling threads @pollers = [] @pollers.extend(Mutex_m) # Pool of worker threads @workers = [] @workers.extend(Mutex_m) # Connection to the job queue @pool_capacity = (Workling.config[:pool_size] || 25).to_i end |
Instance Attribute Details
#pool_capacity ⇒ Object (readonly)
Returns the value of attribute pool_capacity.
30 31 32 |
# File 'lib/workling/invokers/thread_pool_poller.rb', line 30 def pool_capacity @pool_capacity end |
#reset_time ⇒ Object (readonly)
Returns the value of attribute reset_time.
30 31 32 |
# File 'lib/workling/invokers/thread_pool_poller.rb', line 30 def reset_time @reset_time end |
#sleep_time ⇒ Object (readonly)
Returns the value of attribute sleep_time.
30 31 32 |
# File 'lib/workling/invokers/thread_pool_poller.rb', line 30 def sleep_time @sleep_time end |
Instance Method Details
#listen ⇒ Object
Start up the checking for items on the queue. Will block until stop is called and all pollers and workers have finished execution.
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/workling/invokers/thread_pool_poller.rb', line 54 def listen logger.info("Starting ThreadPoolPoller...") # Determine which queues need checking Workling::Discovery.discovered_workers.map do |klass| @pollers.synchronize do # Polls the backing queue for jobs to be done @pollers << Thread.new do poller_thread(@routing.queue_names_routing_class(klass)) end end end # Wait for the poller and all outstanding workers to finish. # # This is a little tricky because we're doing some synchronization on pollers... but # the list of pollers is never modified after being setup above. @pollers.synchronize { @pollers.dup }.each { |p| p.join } @pollers.synchronize { @pollers.clear } logger.info("Pollers have all finished") @workers.synchronize { @workers.dup }.each { |w| w.join } logger.info("Worker threads have all finished") end |
#poller_threads ⇒ Object
Number of currently active polling threads
96 97 98 |
# File 'lib/workling/invokers/thread_pool_poller.rb', line 96 def poller_threads @pollers.synchronize { @pollers.size } end |
#stop ⇒ Object
Instructs the thread pool poller to stop checking for new jobs on the backing queue.
80 81 82 83 |
# File 'lib/workling/invokers/thread_pool_poller.rb', line 80 def stop logger.info("Stopping thread pool invoker pollers and workers...") @pollers.synchronize { @pollers.each { |p| p[:shutdown] = true } } end |
#worker_threads ⇒ Object
Number of correctly active worker threads
91 92 93 |
# File 'lib/workling/invokers/thread_pool_poller.rb', line 91 def worker_threads @workers.synchronize { @workers.size } end |
#workers_available? ⇒ Boolean
Set pool_size in workling config to adjust the maximum number of threads in the pool
86 87 88 |
# File 'lib/workling/invokers/thread_pool_poller.rb', line 86 def workers_available? worker_threads < @pool_capacity end |