Class: Cloudtasker::Cron::Job

Inherits:
Object
  • Object
show all
Defined in:
lib/cloudtasker/cron/job.rb

Overview

Manage cron jobs

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(worker) ⇒ Job

Build a new instance of the class

Parameters:



16
17
18
# File 'lib/cloudtasker/cron/job.rb', line 16

def initialize(worker)
  @worker = worker
end

Instance Attribute Details

#workerObject (readonly)

Returns the value of attribute worker.



9
10
11
# File 'lib/cloudtasker/cron/job.rb', line 9

def worker
  @worker
end

Instance Method Details

#cron_job?Boolean

Return true if the worker is tagged as a cron job.

Returns:

  • (Boolean)

    True if the worker relates to a cron schedule.



78
79
80
# File 'lib/cloudtasker/cron/job.rb', line 78

def cron_job?
  cron_schedule
end

#cron_scheduleFugit::Cron

Return the cron schedule to use for the job.

Returns:

  • (Fugit::Cron)

    The cron schedule.



114
115
116
117
118
# File 'lib/cloudtasker/cron/job.rb', line 114

def cron_schedule
  return nil unless schedule_id

  @cron_schedule ||= Cron::Schedule.find(schedule_id)
end

#current_timeTime

Return the time this cron instance is expected to run at.

Returns:

  • (Time)

    The current cron instance time.



125
126
127
128
129
130
131
132
# File 'lib/cloudtasker/cron/job.rb', line 125

def current_time
  @current_time ||=
    begin
      Time.parse(worker.job_meta.get(key(:time_at)).to_s)
    rescue ArgumentError
      Time.try(:current) || Time.now
    end
end

#executeObject

Execute the (cron) job. This method is invoked by the cron middleware.



184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
# File 'lib/cloudtasker/cron/job.rb', line 184

def execute
  # Execute the job immediately if this worker is not flagged as a cron job.
  return yield unless cron_job?

  # Abort and reject job if this cron instance is not expected.
  return true unless expected_instance?

  # Schedule the next instance of the job
  schedule! unless retry_instance?

  # Flag the cron instance as processing.
  flag(:processing)

  # Execute the cron instance
  yield

  # Flag the cron instance as done
  flag(:done)
end

#expected_instance?Boolean

Return true if the cron job is the one we are expecting. This method is used to ensure that jobs related to outdated cron schedules do not get processed.

Returns:

  • (Boolean)

    True if the cron job is expected.



150
151
152
# File 'lib/cloudtasker/cron/job.rb', line 150

def expected_instance?
  retry_instance? || cron_schedule.job_id == job_id
end

#flag(state) ⇒ Object

Store the cron job instance state.

Parameters:

  • state (String, Symbol)

    The worker state.



159
160
161
# File 'lib/cloudtasker/cron/job.rb', line 159

def flag(state)
  state.to_sym == :done ? redis.del(job_gid) : redis.set(job_gid, state.to_s)
end

#job_gidString

Return the namespaced worker id.

Returns:

  • (String)

    The worker namespaced id.



60
61
62
# File 'lib/cloudtasker/cron/job.rb', line 60

def job_gid
  key(job_id)
end

#job_idString

Return the worker id.

Returns:

  • (String)

    The worker id.



51
52
53
# File 'lib/cloudtasker/cron/job.rb', line 51

def job_id
  worker.job_id
end

#key(val) ⇒ String

Return a namespaced key

Parameters:

  • val (String, Symbol)

    The key to namespace

Returns:

  • (String)

    The namespaced key.



27
28
29
30
31
# File 'lib/cloudtasker/cron/job.rb', line 27

def key(val)
  return nil if val.nil?

  [self.class.to_s.underscore, val.to_s].join('/')
end

#next_timeEtOrbi::EoTime

Return the Time when the job should run next.

Returns:

  • (EtOrbi::EoTime)

    The time the job should run next.



139
140
141
# File 'lib/cloudtasker/cron/job.rb', line 139

def next_time
  @next_time ||= cron_schedule&.next_time(current_time)
end

#redisCloudtasker::RedisClient

Return the cloudtasker redis client

Returns:



105
106
107
# File 'lib/cloudtasker/cron/job.rb', line 105

def redis
  @redis ||= RedisClient.new
end

#retry_instance?Boolean

Return true if the worker is currently processing (includes retries).

Returns:

  • (Boolean)

    True f the worker is processing.



87
88
89
# File 'lib/cloudtasker/cron/job.rb', line 87

def retry_instance?
  cron_job? && state
end

#schedule!Object

Schedule the next cron instance.

The task only gets scheduled the first time a worker runs for a given cron instance (Typically a cron worker failing and retrying will not lead to a new task getting scheduled).



170
171
172
173
174
175
176
177
178
179
# File 'lib/cloudtasker/cron/job.rb', line 170

def schedule!
  return false unless cron_schedule

  # Configure next cron worker
  next_worker = worker.new_instance.tap { |e| e.job_meta.set(key(:time_at), next_time.iso8601) }

  # Schedule next worker
  task = next_worker.schedule(time_at: next_time)
  cron_schedule.update(task_id: task.id, job_id: next_worker.job_id)
end

#schedule_idString

Return the cron schedule id.

Returns:

  • (String)

    The schedule id.



69
70
71
# File 'lib/cloudtasker/cron/job.rb', line 69

def schedule_id
  @schedule_id ||= worker.job_meta.get(key(:schedule_id))
end

#set(schedule_id:) ⇒ Cloudtasker::Cron::Job

Add cron metadata to the worker.

Parameters:

  • name (String, Symbol)

    The name of the cron task.

  • cron (String)

    The cron expression.

Returns:



41
42
43
44
# File 'lib/cloudtasker/cron/job.rb', line 41

def set(schedule_id:)
  worker.job_meta.set(key(:schedule_id), schedule_id.to_s)
  self
end

#stateString?

Return the job processing state.

Returns:

  • (String, nil)

    The processing state.



96
97
98
# File 'lib/cloudtasker/cron/job.rb', line 96

def state
  redis.get(job_gid)&.to_sym
end