Class: PuppetLanguageServer::GlobalQueues::SingleInstanceQueue
- Inherits:
-
Object
- Object
- PuppetLanguageServer::GlobalQueues::SingleInstanceQueue
- Defined in:
- lib/puppet-languageserver/global_queues/single_instance_queue.rb
Overview
Base class for enqueing and running queued jobs asynchronously When adding a job, it will remove any other for the same key in the queue, so that only the latest job needs to be processed.
Direct Known Subclasses
Instance Method Summary collapse
-
#drain_queue ⇒ Object
Wait for the queue to become empty.
-
#enqueue(*args) ⇒ Object
Helpful method to create, then enqueue a job.
-
#enqueue_job(job_object) ⇒ Object
Enqueue a job.
-
#execute(*args) ⇒ Object
Helpful method to create, then enqueue a job.
-
#execute_job(job_object) ⇒ Object
abstract
Synchronously executes the same work as an enqueued item.
-
#initialize ⇒ SingleInstanceQueue
constructor
A new instance of SingleInstanceQueue.
-
#job_class ⇒ Object
The ruby Job class that this queue operates on Should be inherited from SingleInstanceQueueJob.
-
#max_queue_threads ⇒ Object
Default is one thread to process the queue.
- #new_job(*args) ⇒ Object
-
#reset_queue(initial_state = []) ⇒ Object
Testing helper resets the queue and prepopulates it with a known arbitrary configuration.
Constructor Details
#initialize ⇒ SingleInstanceQueue
Returns a new instance of SingleInstanceQueue.
19 20 21 22 23 24 |
# File 'lib/puppet-languageserver/global_queues/single_instance_queue.rb', line 19 def initialize @queue = [] @queue_mutex = Mutex.new @queue_threads_mutex = Mutex.new @queue_threads = [] end |
Instance Method Details
#drain_queue ⇒ Object
Wait for the queue to become empty
85 86 87 88 89 90 |
# File 'lib/puppet-languageserver/global_queues/single_instance_queue.rb', line 85 def drain_queue @queue_threads.each do |item| item.join unless item.nil? || !item.alive? end nil end |
#enqueue(*args) ⇒ Object
Helpful method to create, then enqueue a job
43 44 45 |
# File 'lib/puppet-languageserver/global_queues/single_instance_queue.rb', line 43 def enqueue(*args) enqueue_job(new_job(*args)) end |
#enqueue_job(job_object) ⇒ Object
Enqueue a job
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/puppet-languageserver/global_queues/single_instance_queue.rb', line 48 def enqueue_job(job_object) raise "Invalid job object for #{self.class}. Got #{job_object.class} but expected #{job_class}" unless job_object.is_a?(job_class) @queue_mutex.synchronize do @queue.reject! { |queue_item| queue_item.key == job_object.key } @queue << job_object end @queue_threads_mutex.synchronize do # Clear up any done threads @queue_threads.reject! { |thr| thr.nil? || !thr.alive? } # Append a new thread if we have space if @queue_threads.count < max_queue_threads @queue_threads << Thread.new do thread_worker rescue => e # rubocop:disable Style/RescueStandardError PuppetLanguageServer.(:error, "Error in #{self.class} Thread: #{e}") raise end end end nil end |
#execute(*args) ⇒ Object
Helpful method to create, then enqueue a job
73 74 75 |
# File 'lib/puppet-languageserver/global_queues/single_instance_queue.rb', line 73 def execute(*args) execute_job(new_job(*args)) end |
#execute_job(job_object) ⇒ Object
Synchronously executes the same work as an enqueued item. Does not consume a queue thread The thread worker calls this method when processing enqueued items
80 81 82 |
# File 'lib/puppet-languageserver/global_queues/single_instance_queue.rb', line 80 def execute_job(job_object) raise "Invalid job object for #{self.class}. Got #{job_object.class} but expected #{job_class}" unless job_object.is_a?(job_class) end |
#job_class ⇒ Object
The ruby Job class that this queue operates on Should be inherited from SingleInstanceQueueJob
34 35 36 |
# File 'lib/puppet-languageserver/global_queues/single_instance_queue.rb', line 34 def job_class SingleInstanceQueueJob end |
#max_queue_threads ⇒ Object
Default is one thread to process the queue
27 28 29 |
# File 'lib/puppet-languageserver/global_queues/single_instance_queue.rb', line 27 def max_queue_threads 1 end |
#new_job(*args) ⇒ Object
38 39 40 |
# File 'lib/puppet-languageserver/global_queues/single_instance_queue.rb', line 38 def new_job(*args) job_class.new(*args) end |
#reset_queue(initial_state = []) ⇒ Object
Testing helper resets the queue and prepopulates it with a known arbitrary configuration. ONLY USE THIS FOR TESTING!
95 96 97 98 99 |
# File 'lib/puppet-languageserver/global_queues/single_instance_queue.rb', line 95 def reset_queue(initial_state = []) @queue_mutex.synchronize do @queue = initial_state end end |