Class: RocketJob::Worker
- Inherits:
-
Object
- Object
- RocketJob::Worker
- Includes:
- SemanticLogger::Loggable
- Defined in:
- lib/rocket_job/worker.rb
Overview
Worker
A worker runs on a single operating system thread Is usually started under a Rocket Job server process.
Direct Known Subclasses
Defined Under Namespace
Classes: Shutdown
Instance Attribute Summary collapse
-
#current_filter ⇒ Object
Returns the value of attribute current_filter.
-
#id ⇒ Object
Returns the value of attribute id.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#server_name ⇒ Object
readonly
Returns the value of attribute server_name.
Instance Method Summary collapse
-
#add_to_current_filter(filter) ⇒ Object
Add the supplied filter to the current filter.
- #alive? ⇒ Boolean
- #backtrace ⇒ Object
-
#find_and_assign_job ⇒ Object
Finds the next job to work on in priority based order and assigns it to this worker.
-
#initialize(id: 0, server_name: "inline:0") ⇒ Worker
constructor
A new instance of Worker.
- #join(*_args) ⇒ Object
- #kill ⇒ Object
-
#next_available_job ⇒ Object
Returns [RocketJob::Job] the next job available for processing.
-
#random_wait_interval ⇒ Object
Returns [Float] a randomized poll interval in seconds up to the maximum configured poll interval.
-
#reset_filter_if_expired ⇒ Object
Resets the current job filter if the relevant time interval has passed.
-
#run ⇒ Object
Process jobs until it shuts down.
- #shutdown! ⇒ Object
- #shutdown? ⇒ Boolean
-
#throttled_job?(job) ⇒ Boolean
Whether the supplied job has been throttled and should be ignored.
-
#wait_for_shutdown?(_timeout = nil) ⇒ Boolean
Returns [true|false] whether the shutdown indicator was set.
Constructor Details
#initialize(id: 0, server_name: "inline:0") ⇒ Worker
Returns a new instance of Worker.
19 20 21 22 23 24 25 |
# File 'lib/rocket_job/worker.rb', line 19 def initialize(id: 0, server_name: "inline:0") @id = id @server_name = server_name @name = "#{server_name}:#{id}" @re_check_start = Time.now @current_filter = Config.filter || {} end |
Instance Attribute Details
#current_filter ⇒ Object
Returns the value of attribute current_filter.
9 10 11 |
# File 'lib/rocket_job/worker.rb', line 9 def current_filter @current_filter end |
#id ⇒ Object
Returns the value of attribute id.
9 10 11 |
# File 'lib/rocket_job/worker.rb', line 9 def id @id end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
10 11 12 |
# File 'lib/rocket_job/worker.rb', line 10 def name @name end |
#server_name ⇒ Object (readonly)
Returns the value of attribute server_name.
10 11 12 |
# File 'lib/rocket_job/worker.rb', line 10 def server_name @server_name end |
Instance Method Details
#add_to_current_filter(filter) ⇒ Object
Add the supplied filter to the current filter.
165 166 167 168 169 170 171 172 173 174 175 |
# File 'lib/rocket_job/worker.rb', line 165 def add_to_current_filter(filter) filter.each_pair do |k, v| current_filter[k] = if (previous = current_filter[k]) v.is_a?(Array) ? previous + v : v else v end end current_filter end |
#alive? ⇒ Boolean
27 28 29 |
# File 'lib/rocket_job/worker.rb', line 27 def alive? true end |
#backtrace ⇒ Object
31 32 33 |
# File 'lib/rocket_job/worker.rb', line 31 def backtrace Thread.current.backtrace end |
#find_and_assign_job ⇒ Object
Finds the next job to work on in priority based order and assigns it to this worker.
Applies the current filter to exclude filtered jobs.
Returns nil if no jobs are available for processing.
153 154 155 156 157 158 159 160 161 162 |
# File 'lib/rocket_job/worker.rb', line 153 def find_and_assign_job SemanticLogger.silence(:info) do scheduled = RocketJob::Job.where(run_at: nil).or(:run_at.lte => Time.now) working = RocketJob::Job.queued.or(state: "running", sub_state: "processing") query = RocketJob::Job.and(working, scheduled) query = query.and(current_filter) unless current_filter.blank? update = {"$set" => {"worker_name" => name, "state" => "running"}} query.sort(priority: 1, _id: 1).find_one_and_update(update, bypass_document_validation: true) end end |
#join(*_args) ⇒ Object
35 36 37 |
# File 'lib/rocket_job/worker.rb', line 35 def join(*_args) true end |
#kill ⇒ Object
39 40 41 |
# File 'lib/rocket_job/worker.rb', line 39 def kill true end |
#next_available_job ⇒ Object
Returns [RocketJob::Job] the next job available for processing. Returns [nil] if no job is available for processing.
Notes:
-
Destroys expired jobs
-
Runs job throttles and skips the job if it is throttled.
-
Adding that filter to the current filter to exclude from subsequent polling.
-
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/rocket_job/worker.rb', line 106 def next_available_job until shutdown? job = find_and_assign_job return unless job if job.expired? job.fail_on_exception! do job.worker_name = name job.destroy logger.info("Destroyed expired job.") end next end # Batch Job that is already started? # Batch has its own throttles for slices. return job if job.running? # Should this job be throttled? next if job.fail_on_exception! { throttled_job?(job) } # Job failed during throttle execution? next if job.failed? # Start this job! job.fail_on_exception! { job.start!(name) } return job if job.running? end end |
#random_wait_interval ⇒ Object
Returns [Float] a randomized poll interval in seconds up to the maximum configured poll interval.
178 179 180 |
# File 'lib/rocket_job/worker.rb', line 178 def random_wait_interval rand(Config.max_poll_seconds * 1000) / 1000 end |
#reset_filter_if_expired ⇒ Object
Resets the current job filter if the relevant time interval has passed
90 91 92 93 94 95 96 97 |
# File 'lib/rocket_job/worker.rb', line 90 def reset_filter_if_expired # Only clear out the current_filter after every `re_check_seconds` time = Time.now return unless (time - @re_check_start) > Config.re_check_seconds @re_check_start = time self.current_filter = Config.filter || {} end |
#run ⇒ Object
Process jobs until it shuts down
Params
worker_id [Integer]
The number of this worker for logging purposes
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/rocket_job/worker.rb', line 61 def run Thread.current.name = format("rocketjob %03i", id) logger.info "Started" until shutdown? sleep_seconds = Config.max_poll_seconds reset_filter_if_expired job = next_available_job # Returns true when work was completed, but no other work is available if job&.rocket_job_work(self, false) # Return the database connections for this thread back to the connection pool ActiveRecord::Base.clear_active_connections! if defined?(ActiveRecord::Base) # Stagger workers so that they don't all poll at the same time. sleep_seconds = random_wait_interval end wait_for_shutdown?(sleep_seconds) end logger.info "Stopping" rescue Exception => e logger.fatal("Unhandled exception in job processing thread", e) ensure ActiveRecord::Base.clear_active_connections! if defined?(ActiveRecord::Base) end |
#shutdown! ⇒ Object
47 48 49 |
# File 'lib/rocket_job/worker.rb', line 47 def shutdown! true end |
#shutdown? ⇒ Boolean
43 44 45 |
# File 'lib/rocket_job/worker.rb', line 43 def shutdown? false end |
#throttled_job?(job) ⇒ Boolean
Whether the supplied job has been throttled and should be ignored.
136 137 138 139 140 141 142 143 144 145 |
# File 'lib/rocket_job/worker.rb', line 136 def throttled_job?(job) # Evaluate job throttles, if any. filter = job.rocket_job_throttles.matching_filter(job) return false unless filter add_to_current_filter(filter) # Restore retrieved job so that other workers can process it later job.set(worker_name: nil, state: :queued) true end |
#wait_for_shutdown?(_timeout = nil) ⇒ Boolean
Returns [true|false] whether the shutdown indicator was set
52 53 54 |
# File 'lib/rocket_job/worker.rb', line 52 def wait_for_shutdown?(_timeout = nil) false end |