Class: LogStash::Outputs::Gcs::WorkerPool
- Inherits:
-
Object
- Object
- LogStash::Outputs::Gcs::WorkerPool
- Defined in:
- lib/logstash/outputs/gcs/worker_pool.rb
Overview
WorkerPool creates a pool of workers that can handle jobs.
Instance Attribute Summary collapse
-
#workers ⇒ Object
readonly
Returns the value of attribute workers.
Instance Method Summary collapse
-
#initialize(max_threads, synchronous = false) ⇒ WorkerPool
constructor
A new instance of WorkerPool.
-
#post(&block) ⇒ Object
Submits a job to the worker pool, raises an error if the pool has already been stopped.
-
#stop! ⇒ Object
Stops the worker pool.
Constructor Details
#initialize(max_threads, synchronous = false) ⇒ WorkerPool
Returns a new instance of WorkerPool.
12 13 14 15 16 17 18 19 20 21 22 23 |
# File 'lib/logstash/outputs/gcs/worker_pool.rb', line 12 def initialize(max_threads, synchronous=false) @synchronous = synchronous # set queue depth to the be the same as the number of threads so # there's at most one pending job each when the plugin quits @workers = Concurrent::ThreadPoolExecutor.new( min_threads: 1, max_threads: max_threads, max_queue: max_threads, fallback_policy: :caller_runs ) end |
Instance Attribute Details
#workers ⇒ Object (readonly)
Returns the value of attribute workers.
10 11 12 |
# File 'lib/logstash/outputs/gcs/worker_pool.rb', line 10 def workers @workers end |
Instance Method Details
#post(&block) ⇒ Object
Submits a job to the worker pool, raises an error if the pool has already been stopped.
27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/logstash/outputs/gcs/worker_pool.rb', line 27 def post(&block) raise 'Pool already stopped' unless @workers.running? if @synchronous block.call else @workers.post do block.call end end end |
#stop! ⇒ Object
Stops the worker pool
40 41 42 43 |
# File 'lib/logstash/outputs/gcs/worker_pool.rb', line 40 def stop! @workers.shutdown @workers.wait_for_termination end |