Class: Procrastinator::QueueWorker

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/procrastinator/queue_worker.rb

Overview

A QueueWorker checks for tasks to run from the task store and executes them, updating information in the task store as necessary.

Constant Summary collapse

PERSISTER_METHODS =

expected methods for all persistence strategies

[:read, :update, :delete].freeze

Instance Method Summary collapse

Constructor Details

#initialize(queue:, config:) ⇒ QueueWorker

Returns a new instance of QueueWorker.

Raises:

  • (ArgumentError)


16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/procrastinator/queue_worker.rb', line 16

def initialize(queue:, config:)
   raise ArgumentError, ':queue cannot be nil' if queue.nil?
   raise ArgumentError, ':config cannot be nil' if config.nil?

   @config = config

   @queue = if queue.is_a? Symbol
               config.queue(name: queue)
            else
               queue
            end

   @scheduler = Scheduler.new(config)
   @logger    = Logger.new(StringIO.new)
end

Instance Method Details

#haltObject

Logs halting the queue



68
69
70
71
# File 'lib/procrastinator/queue_worker.rb', line 68

def halt
   @logger&.info("Halted worker on queue: #{ name }")
   @logger&.close
end

#open_log!(name, config) ⇒ Object

Starts a log file and returns the created Logger



74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/procrastinator/queue_worker.rb', line 74

def open_log!(name, config)
   return @logger unless config.log_level

   log_path = config.log_dir / "#{ name }.log"

   config.log_dir.mkpath
   FileUtils.touch(log_path)

   Logger.new(log_path.to_path,
              config.log_shift_age, config.log_shift_size,
              level:     config.log_level || Logger::FATAL,
              progname:  name,
              formatter: Config::DEFAULT_LOG_FORMATTER)
end

#work!Object

Works on jobs forever



33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/procrastinator/queue_worker.rb', line 33

def work!
   @logger = open_log!("#{ name }-queue-worker", @config)
   @logger.info("Started worker thread to consume queue: #{ name }")

   loop do
      sleep(@queue.update_period)

      work_one
   end
rescue StandardError => e
   @logger.fatal(e)

   raise
end

#work_oneObject

Performs exactly one task on the queue



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/procrastinator/queue_worker.rb', line 49

def work_one
   task = next_task(logger:    @logger,
                    container: @config.container,
                    scheduler: @scheduler) || return

   begin
      task.run

      @queue.delete(task.id)
   rescue StandardError => e
      task.fail(e)

      task_info = task.to_h
      id        = task_info.delete(:id)
      @queue.update(id, **task_info)
   end
end