Class: PuppetLanguageServer::GlobalQueues::SingleInstanceQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/puppet-languageserver/global_queues/single_instance_queue.rb

Overview

Base class for enqueing and running queued jobs asynchronously When adding a job, it will remove any other for the same key in the queue, so that only the latest job needs to be processed.

Direct Known Subclasses

SidecarQueue, ValidationQueue

Instance Method Summary collapse

Constructor Details

#initializeSingleInstanceQueue

Returns a new instance of SingleInstanceQueue.



19
20
21
22
23
24
# File 'lib/puppet-languageserver/global_queues/single_instance_queue.rb', line 19

def initialize
  @queue = []
  @queue_mutex = Mutex.new
  @queue_threads_mutex = Mutex.new
  @queue_threads = []
end

Instance Method Details

#drain_queueObject

Wait for the queue to become empty



85
86
87
88
89
90
# File 'lib/puppet-languageserver/global_queues/single_instance_queue.rb', line 85

def drain_queue
  @queue_threads.each do |item|
    item.join unless item.nil? || !item.alive?
  end
  nil
end

#enqueue(*args) ⇒ Object

Helpful method to create, then enqueue a job



43
44
45
# File 'lib/puppet-languageserver/global_queues/single_instance_queue.rb', line 43

def enqueue(*args)
  enqueue_job(new_job(*args))
end

#enqueue_job(job_object) ⇒ Object

Enqueue a job



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/puppet-languageserver/global_queues/single_instance_queue.rb', line 48

def enqueue_job(job_object)
  raise "Invalid job object for #{self.class}. Got #{job_object.class} but expected #{job_class}" unless job_object.is_a?(job_class)

  @queue_mutex.synchronize do
    @queue.reject! { |queue_item| queue_item.key == job_object.key }
    @queue << job_object
  end

  @queue_threads_mutex.synchronize do
    # Clear up any done threads
    @queue_threads.reject! { |thr| thr.nil? || !thr.alive? }
    # Append a new thread if we have space
    if @queue_threads.count < max_queue_threads
      @queue_threads << Thread.new do
        thread_worker
      rescue => e # rubocop:disable Style/RescueStandardError
        PuppetLanguageServer.log_message(:error, "Error in #{self.class} Thread: #{e}")
        raise
      end
    end
  end
  nil
end

#execute(*args) ⇒ Object

Helpful method to create, then enqueue a job



73
74
75
# File 'lib/puppet-languageserver/global_queues/single_instance_queue.rb', line 73

def execute(*args)
  execute_job(new_job(*args))
end

#execute_job(job_object) ⇒ Object

This method is abstract.

Synchronously executes the same work as an enqueued item. Does not consume a queue thread The thread worker calls this method when processing enqueued items



80
81
82
# File 'lib/puppet-languageserver/global_queues/single_instance_queue.rb', line 80

def execute_job(job_object)
  raise "Invalid job object for #{self.class}. Got #{job_object.class} but expected #{job_class}" unless job_object.is_a?(job_class)
end

#job_classObject

The ruby Job class that this queue operates on Should be inherited from SingleInstanceQueueJob



34
35
36
# File 'lib/puppet-languageserver/global_queues/single_instance_queue.rb', line 34

def job_class
  SingleInstanceQueueJob
end

#max_queue_threadsObject

Default is one thread to process the queue



27
28
29
# File 'lib/puppet-languageserver/global_queues/single_instance_queue.rb', line 27

def max_queue_threads
  1
end

#new_job(*args) ⇒ Object



38
39
40
# File 'lib/puppet-languageserver/global_queues/single_instance_queue.rb', line 38

def new_job(*args)
  job_class.new(*args)
end

#reset_queue(initial_state = []) ⇒ Object

Testing helper resets the queue and prepopulates it with a known arbitrary configuration. ONLY USE THIS FOR TESTING!



95
96
97
98
99
# File 'lib/puppet-languageserver/global_queues/single_instance_queue.rb', line 95

def reset_queue(initial_state = [])
  @queue_mutex.synchronize do
    @queue = initial_state
  end
end