Class: Cloudtasker::Cron::Job
- Inherits:
-
Object
- Object
- Cloudtasker::Cron::Job
- Defined in:
- lib/cloudtasker/cron/job.rb
Overview
Manage cron jobs
Instance Attribute Summary collapse
-
#worker ⇒ Object
readonly
Returns the value of attribute worker.
Instance Method Summary collapse
-
#cron_job? ⇒ Boolean
Return true if the worker is tagged as a cron job.
-
#cron_schedule ⇒ Fugit::Cron
Return the cron schedule to use for the job.
-
#current_time ⇒ Time
Return the time this cron instance is expected to run at.
-
#execute ⇒ Object
Execute the (cron) job.
-
#expected_instance? ⇒ Boolean
Return true if the cron job is the one we are expecting.
-
#flag(state) ⇒ Object
Store the cron job instance state.
-
#initialize(worker) ⇒ Job
constructor
Build a new instance of the class.
-
#job_gid ⇒ String
Return the namespaced worker id.
-
#job_id ⇒ String
Return the worker id.
-
#key(val) ⇒ String
Return a namespaced key.
-
#next_time ⇒ EtOrbi::EoTime
Return the Time when the job should run next.
-
#redis ⇒ Cloudtasker::RedisClient
Return the cloudtasker redis client.
-
#retry_instance? ⇒ Boolean
Return true if the worker is currently processing (includes retries).
-
#schedule! ⇒ Object
Schedule the next cron instance.
-
#schedule_id ⇒ String
Return the cron schedule id.
-
#set(schedule_id:) ⇒ Cloudtasker::Cron::Job
Add cron metadata to the worker.
-
#state ⇒ String?
Return the job processing state.
Constructor Details
#initialize(worker) ⇒ Job
Build a new instance of the class
16 17 18 |
# File 'lib/cloudtasker/cron/job.rb', line 16 def initialize(worker) @worker = worker end |
Instance Attribute Details
#worker ⇒ Object (readonly)
Returns the value of attribute worker.
9 10 11 |
# File 'lib/cloudtasker/cron/job.rb', line 9 def worker @worker end |
Instance Method Details
#cron_job? ⇒ Boolean
Return true if the worker is tagged as a cron job.
78 79 80 |
# File 'lib/cloudtasker/cron/job.rb', line 78 def cron_job? cron_schedule end |
#cron_schedule ⇒ Fugit::Cron
Return the cron schedule to use for the job.
114 115 116 117 118 |
# File 'lib/cloudtasker/cron/job.rb', line 114 def cron_schedule return nil unless schedule_id @cron_schedule ||= Cron::Schedule.find(schedule_id) end |
#current_time ⇒ Time
Return the time this cron instance is expected to run at.
125 126 127 128 129 130 131 132 |
# File 'lib/cloudtasker/cron/job.rb', line 125 def current_time @current_time ||= begin Time.parse(worker..get(key(:time_at)).to_s) rescue ArgumentError Time.try(:current) || Time.now end end |
#execute ⇒ Object
Execute the (cron) job. This method is invoked by the cron middleware.
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 |
# File 'lib/cloudtasker/cron/job.rb', line 184 def execute # Execute the job immediately if this worker is not flagged as a cron job. return yield unless cron_job? # Abort and reject job if this cron instance is not expected. return true unless expected_instance? # Schedule the next instance of the job schedule! unless retry_instance? # Flag the cron instance as processing. flag(:processing) # Execute the cron instance yield # Flag the cron instance as done flag(:done) end |
#expected_instance? ⇒ Boolean
Return true if the cron job is the one we are expecting. This method is used to ensure that jobs related to outdated cron schedules do not get processed.
150 151 152 |
# File 'lib/cloudtasker/cron/job.rb', line 150 def expected_instance? retry_instance? || cron_schedule.job_id == job_id end |
#flag(state) ⇒ Object
Store the cron job instance state.
159 160 161 |
# File 'lib/cloudtasker/cron/job.rb', line 159 def flag(state) state.to_sym == :done ? redis.del(job_gid) : redis.set(job_gid, state.to_s) end |
#job_gid ⇒ String
Return the namespaced worker id.
60 61 62 |
# File 'lib/cloudtasker/cron/job.rb', line 60 def job_gid key(job_id) end |
#job_id ⇒ String
Return the worker id.
51 52 53 |
# File 'lib/cloudtasker/cron/job.rb', line 51 def job_id worker.job_id end |
#key(val) ⇒ String
Return a namespaced key
27 28 29 30 31 |
# File 'lib/cloudtasker/cron/job.rb', line 27 def key(val) return nil if val.nil? [self.class.to_s.underscore, val.to_s].join('/') end |
#next_time ⇒ EtOrbi::EoTime
Return the Time when the job should run next.
139 140 141 |
# File 'lib/cloudtasker/cron/job.rb', line 139 def next_time @next_time ||= cron_schedule&.next_time(current_time) end |
#redis ⇒ Cloudtasker::RedisClient
Return the cloudtasker redis client
105 106 107 |
# File 'lib/cloudtasker/cron/job.rb', line 105 def redis @redis ||= RedisClient.new end |
#retry_instance? ⇒ Boolean
Return true if the worker is currently processing (includes retries).
87 88 89 |
# File 'lib/cloudtasker/cron/job.rb', line 87 def retry_instance? cron_job? && state end |
#schedule! ⇒ Object
Schedule the next cron instance.
The task only gets scheduled the first time a worker runs for a given cron instance (Typically a cron worker failing and retrying will not lead to a new task getting scheduled).
170 171 172 173 174 175 176 177 178 179 |
# File 'lib/cloudtasker/cron/job.rb', line 170 def schedule! return false unless cron_schedule # Configure next cron worker next_worker = worker.new_instance.tap { |e| e..set(key(:time_at), next_time.iso8601) } # Schedule next worker task = next_worker.schedule(time_at: next_time) cron_schedule.update(task_id: task.id, job_id: next_worker.job_id) end |
#schedule_id ⇒ String
Return the cron schedule id.
69 70 71 |
# File 'lib/cloudtasker/cron/job.rb', line 69 def schedule_id @schedule_id ||= worker..get(key(:schedule_id)) end |
#set(schedule_id:) ⇒ Cloudtasker::Cron::Job
Add cron metadata to the worker.
41 42 43 44 |
# File 'lib/cloudtasker/cron/job.rb', line 41 def set(schedule_id:) worker..set(key(:schedule_id), schedule_id.to_s) self end |
#state ⇒ String?
Return the job processing state.
96 97 98 |
# File 'lib/cloudtasker/cron/job.rb', line 96 def state redis.get(job_gid)&.to_sym end |