Class: Cloudtasker::Cron::Schedule

Inherits:
Object
  • Object
show all
Defined in:
lib/cloudtasker/cron/schedule.rb

Overview

Manage cron schedules

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(id:, cron:, worker:, **opts) ⇒ Schedule

Build a new instance of the class.

Parameters:

  • id (String)

    The schedule id.

  • cron (String)

    The cron expression.

  • worker (Class)

    The worker class to run.

  • args (Array<any>)

    The worker arguments.

  • queue (String)

    The queue to use for the cron job.

  • task_id (String)

    The ID of the actual backend task.

  • job_id (String)

    The ID of the Cloudtasker worker.



127
128
129
130
131
132
133
134
135
# File 'lib/cloudtasker/cron/schedule.rb', line 127

def initialize(id:, cron:, worker:, **opts)
  @id = id
  @cron = cron
  @worker = worker
  @args = opts[:args]
  @queue = opts[:queue]
  @task_id = opts[:task_id]
  @job_id = opts[:job_id]
end

Instance Attribute Details

#argsObject

Returns the value of attribute args.



10
11
12
# File 'lib/cloudtasker/cron/schedule.rb', line 10

def args
  @args
end

#cronObject

Returns the value of attribute cron.



10
11
12
# File 'lib/cloudtasker/cron/schedule.rb', line 10

def cron
  @cron
end

#idObject

Returns the value of attribute id.



10
11
12
# File 'lib/cloudtasker/cron/schedule.rb', line 10

def id
  @id
end

#job_idObject

Returns the value of attribute job_id.



10
11
12
# File 'lib/cloudtasker/cron/schedule.rb', line 10

def job_id
  @job_id
end

#queueObject

Returns the value of attribute queue.



10
11
12
# File 'lib/cloudtasker/cron/schedule.rb', line 10

def queue
  @queue
end

#task_idObject

Returns the value of attribute task_id.



10
11
12
# File 'lib/cloudtasker/cron/schedule.rb', line 10

def task_id
  @task_id
end

#workerObject

Returns the value of attribute worker.



10
11
12
# File 'lib/cloudtasker/cron/schedule.rb', line 10

def worker
  @worker
end

Class Method Details

.allArray<Cloudtasker::Batch::Schedule>

Return all schedules

Returns:

  • (Array<Cloudtasker::Batch::Schedule>)

    The list of stored schedules.



37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/cloudtasker/cron/schedule.rb', line 37

def self.all
  if redis.exists?(key)
    # Use Schedule Set if available
    redis.smembers(key).map { |id| find(id) }
  else
    # Fallback to redis key matching and migrate schedules
    # to use Schedule Set instead.
    redis.search(key('*')).map do |gid|
      schedule_id = gid.sub(key(''), '')
      redis.sadd(key, [schedule_id])
      find(schedule_id)
    end
  end
end

.create(**opts) ⇒ Cloudtasker::Cron::Schedule

Create a new cron schedule (or update an existing one).

Parameters:

  • **opts (Hash)

    Init opts. See initialize

Returns:



79
80
81
82
83
84
# File 'lib/cloudtasker/cron/schedule.rb', line 79

def self.create(**opts)
  redis.with_lock(key(opts[:id])) do
    config = find(opts[:id]).to_h.merge(opts)
    new(**config).tap(&:save)
  end
end

.delete(id) ⇒ Object

Delete a schedule by id.

Parameters:

  • id (String)

    The schedule id.



104
105
106
107
108
109
110
111
112
113
114
# File 'lib/cloudtasker/cron/schedule.rb', line 104

def self.delete(id)
  redis.with_lock(key(id)) do
    schedule = find(id)
    return false unless schedule

    # Delete task and stored schedule
    CloudTask.delete(schedule.task_id) if schedule.task_id
    redis.srem(key, [schedule.id])
    redis.del(schedule.gid)
  end
end

.find(id) ⇒ Cloudtasker::Cron::Schedule

Return a saved cron schedule.

Parameters:

  • id (String)

    The schedule id.

Returns:



93
94
95
96
97
# File 'lib/cloudtasker/cron/schedule.rb', line 93

def self.find(id)
  return nil unless (schedule_config = redis.fetch(key(id)))

  new(**schedule_config)
end

.key(val = nil) ⇒ String

Return a namespaced key.

Parameters:

  • val (String, Symbol, nil) (defaults to: nil)

    The key to namespace

Returns:

  • (String)

    The namespaced key.



28
29
30
# File 'lib/cloudtasker/cron/schedule.rb', line 28

def self.key(val = nil)
  [to_s.underscore, val].compact.map(&:to_s).join('/')
end

.load_from_hash!(hash) ⇒ Object

Synchronize list of cron schedules from a Hash. Schedules not listed in this hash will be removed.

Examples:

Cloudtasker::Cron::Schedule.load_from_hash!(
  my_job: { cron: '0 0 * * *', worker: 'MyWorker' }
  my_other_job: { cron: '0 10 * * *', worker: 'MyOtherWorker' }
)


62
63
64
65
66
67
68
69
70
# File 'lib/cloudtasker/cron/schedule.rb', line 62

def self.load_from_hash!(hash)
  schedules = hash.map do |id, config|
    schedule_config = JSON.parse(config.to_json, symbolize_names: true).merge(id: id.to_s)
    create(**schedule_config)
  end

  # Remove existing schedules which are not part of the list
  all.reject { |e| schedules.include?(e) }.each { |e| delete(e.id) }
end

.redisCloudtasker::RedisClient

Return the redis client.

Returns:



17
18
19
# File 'lib/cloudtasker/cron/schedule.rb', line 17

def self.redis
  @redis ||= RedisClient.new
end

Instance Method Details

#==(other) ⇒ Boolean

Equality operator.

Parameters:

  • other (Any)

    The object to compare.

Returns:

  • (Boolean)

    True if the object is equal.



171
172
173
# File 'lib/cloudtasker/cron/schedule.rb', line 171

def ==(other)
  other.is_a?(self.class) && other.id == id
end

#assign_attributes(opts) ⇒ Object

Buld edit the object attributes.

Parameters:

  • opts (Hash)

    The attributes to edit.



256
257
258
259
260
# File 'lib/cloudtasker/cron/schedule.rb', line 256

def assign_attributes(opts)
  opts
    .select { |k, _| instance_variables.include?("@#{k}".to_sym) }
    .each { |k, v| instance_variable_set("@#{k}", v) }
end

#changed?Boolean

RReturn true if the instance attributes were changed compared to the schedule saved in Redis.

Returns:

  • (Boolean)

    True if the schedule was modified.



191
192
193
# File 'lib/cloudtasker/cron/schedule.rb', line 191

def changed?
  to_h != self.class.find(id).to_h
end

#config_changed?Boolean

Return true if the configuration of the schedule was changed (cron expression or worker).

Returns:

  • (Boolean)

    True if the schedule config was changed.



181
182
183
# File 'lib/cloudtasker/cron/schedule.rb', line 181

def config_changed?
  self.class.find(id)&.to_config != to_config
end

#cron_scheduleFugit::Cron

Return the cron schedule to use for the job.

Returns:

  • (Fugit::Cron)

    The cron schedule.



227
228
229
# File 'lib/cloudtasker/cron/schedule.rb', line 227

def cron_schedule
  @cron_schedule ||= Fugit::Cron.parse(cron)
end

#gidString

Return the namespaced schedule id.

Returns:

  • (String)

    The namespaced schedule id.



151
152
153
# File 'lib/cloudtasker/cron/schedule.rb', line 151

def gid
  self.class.key(id)
end

#next_time(*args) ⇒ EtOrbi::EoTime

Return the next time a job should run.

Parameters:

  • time (Time)

    An optional reference in time (instead of Time.now)

Returns:

  • (EtOrbi::EoTime)

    The time the schedule job should run next.



247
248
249
# File 'lib/cloudtasker/cron/schedule.rb', line 247

def next_time(*args)
  cron_schedule.next_time(*args)
end

#redisCloudtasker::RedisClient

Return the redis client.

Returns:



142
143
144
# File 'lib/cloudtasker/cron/schedule.rb', line 142

def redis
  self.class.redis
end

#save(update_task: true) ⇒ Object

Save the object in Redis. If the configuration was changed then any existing cloud task is removed and a task is recreated.



276
277
278
279
280
281
282
283
284
285
286
287
288
289
# File 'lib/cloudtasker/cron/schedule.rb', line 276

def save(update_task: true)
  return false unless valid?

  # Save schedule
  config_was_changed = config_changed?
  redis.sadd(self.class.key, [id])
  redis.write(gid, to_h)

  # Stop there if backend does not need update
  return true unless update_task && (config_was_changed || !task_id || !CloudTask.find(task_id))

  # Update backend
  persist_cloud_task
end

#to_configHash

Return a hash describing the configuration of this schedule.

Returns:

  • (Hash)

    The config description hash.



200
201
202
203
204
205
206
207
208
# File 'lib/cloudtasker/cron/schedule.rb', line 200

def to_config
  {
    id: id,
    cron: cron,
    worker: worker,
    args: args,
    queue: queue
  }
end

#to_hHash

Return a hash with all the schedule attributes.

Returns:

  • (Hash)

    The attributes hash.



215
216
217
218
219
220
# File 'lib/cloudtasker/cron/schedule.rb', line 215

def to_h
  to_config.merge(
    task_id: task_id,
    job_id: job_id
  )
end

#update(opts) ⇒ Object

Edit the object attributes and save the object in Redis.

Parameters:

  • opts (Hash)

    The attributes to edit.



267
268
269
270
# File 'lib/cloudtasker/cron/schedule.rb', line 267

def update(opts)
  assign_attributes(opts)
  save
end

#valid?Boolean

Return true if the schedule is valid.

Returns:

  • (Boolean)

    True if the schedule is valid.



160
161
162
# File 'lib/cloudtasker/cron/schedule.rb', line 160

def valid?
  id && cron_schedule && worker
end

#worker_instanceCloudtasker::WorkerWrapper

Return an instance of the underlying worker.

Returns:



236
237
238
# File 'lib/cloudtasker/cron/schedule.rb', line 236

def worker_instance
  WorkerWrapper.new(worker_name: worker, job_args: args, job_queue: queue)
end