Class: Cloudtasker::UniqueJob::Job
- Inherits:
-
Object
- Object
- Cloudtasker::UniqueJob::Job
- Defined in:
- lib/cloudtasker/unique_job/job.rb
Overview
Wrapper class for Cloudtasker::Worker delegating to lock and conflict strategies
Constant Summary collapse
Instance Attribute Summary collapse
-
#call_opts ⇒ Object
readonly
Returns the value of attribute call_opts.
-
#worker ⇒ Object
readonly
Returns the value of attribute worker.
Instance Method Summary collapse
-
#digest_hash ⇒ Hash
Return a unique description of the job in hash format.
-
#id ⇒ String
Return the worker job ID.
-
#initialize(worker, opts = {}) ⇒ Job
constructor
Build a new instance of the class.
-
#lock! ⇒ Object
Acquire a new unique job lock or check that the lock is currently allocated to this job.
-
#lock_instance ⇒ Any
Return the instantiated lock.
-
#lock_ttl ⇒ Integer
Return the Time To Live (TTL) that should be set in Redis for the lock key.
-
#options ⇒ Hash
Return the worker configuration options.
-
#redis ⇒ Cloudtasker::RedisClient
Return the Cloudtasker redis client.
-
#unique_args ⇒ Array<any>
Return the list of arguments used for job uniqueness.
-
#unique_gid ⇒ String
Return the Global ID of the unique job.
-
#unique_id ⇒ String
Return the ID of the unique job.
-
#unlock! ⇒ Object
Delete the job lock.
Constructor Details
#initialize(worker, opts = {}) ⇒ Job
Build a new instance of the class.
19 20 21 22 |
# File 'lib/cloudtasker/unique_job/job.rb', line 19 def initialize(worker, opts = {}) @worker = worker @call_opts = opts end |
Instance Attribute Details
#call_opts ⇒ Object (readonly)
Returns the value of attribute call_opts.
8 9 10 |
# File 'lib/cloudtasker/unique_job/job.rb', line 8 def call_opts @call_opts end |
#worker ⇒ Object (readonly)
Returns the value of attribute worker.
8 9 10 |
# File 'lib/cloudtasker/unique_job/job.rb', line 8 def worker @worker end |
Instance Method Details
#digest_hash ⇒ Hash
Return a unique description of the job in hash format.
101 102 103 104 105 106 |
# File 'lib/cloudtasker/unique_job/job.rb', line 101 def digest_hash @digest_hash ||= { class: worker.class.to_s, unique_args: unique_args } end |
#id ⇒ String
Return the worker job ID.
113 114 115 |
# File 'lib/cloudtasker/unique_job/job.rb', line 113 def id worker.job_id end |
#lock! ⇒ Object
Acquire a new unique job lock or check that the lock is currently allocated to this job.
Raise a ‘Cloudtasker::UniqueJob::LockError` if the lock if taken by another job.
152 153 154 155 156 157 |
# File 'lib/cloudtasker/unique_job/job.rb', line 152 def lock! lock_acquired = redis.set(unique_gid, id, nx: true, ex: lock_ttl) lock_already_acquired = !lock_acquired && redis.get(unique_gid) == id raise(LockError) unless lock_acquired || lock_already_acquired end |
#lock_instance ⇒ Any
Return the instantiated lock.
75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/cloudtasker/unique_job/job.rb', line 75 def lock_instance @lock_instance ||= begin # Infer lock class and get instance lock_name = [:lock] lock_klass = Lock.const_get(lock_name.to_s.split('_').collect(&:capitalize).join) lock_klass.new(self) rescue NameError DEFAULT_LOCK.new(self) end end |
#lock_ttl ⇒ Integer
Return the Time To Live (TTL) that should be set in Redis for the lock key. Having a TTL on lock keys ensures that jobs do not end up stuck due to a dead lock situation.
The TTL is calculated using schedule time + expected max job duration.
The expected max job duration is set to 10 minutes by default. This value was chosen because it’s twice the default request timeout value in Cloud Run. This leaves enough room for queue lag (5 minutes) + job processing (5 minutes).
Queue lag is certainly the most unpredictable factor here. Job processing time is less of a factor. Jobs running for more than 5 minutes should be split into sub-jobs to limit invocation time over HTTP. Cloudtasker batch jobs can help achieve that if you need to make one big job split into sub-jobs “atomic”.
The default lock key expiration of “time_at + 10 minutes” may look aggressive but it is still a better choice than potentially having real-time jobs stuck for X hours.
The expected max job duration can be configured via the ‘lock_ttl` option on the job itself.
59 60 61 62 63 64 65 66 67 68 |
# File 'lib/cloudtasker/unique_job/job.rb', line 59 def lock_ttl now = Time.now.to_i # Get scheduled at and lock duration scheduled_at = [call_opts[:time_at].to_i, now].compact.max lock_duration = ([:lock_ttl] || Cloudtasker::UniqueJob.lock_ttl).to_i # Return TTL scheduled_at + lock_duration - now end |
#options ⇒ Hash
Return the worker configuration options.
29 30 31 |
# File 'lib/cloudtasker/unique_job/job.rb', line 29 def worker.class. end |
#redis ⇒ Cloudtasker::RedisClient
Return the Cloudtasker redis client.
141 142 143 |
# File 'lib/cloudtasker/unique_job/job.rb', line 141 def redis @redis ||= Cloudtasker::RedisClient.new end |
#unique_args ⇒ Array<any>
Return the list of arguments used for job uniqueness.
92 93 94 |
# File 'lib/cloudtasker/unique_job/job.rb', line 92 def unique_args worker.try(:unique_args, worker.job_args) || worker.job_args end |
#unique_gid ⇒ String
Return the Global ID of the unique job. The gid includes the UniqueJob namespace.
132 133 134 |
# File 'lib/cloudtasker/unique_job/job.rb', line 132 def unique_gid [self.class.to_s.underscore, unique_id].join('/') end |
#unique_id ⇒ String
Return the ID of the unique job.
122 123 124 |
# File 'lib/cloudtasker/unique_job/job.rb', line 122 def unique_id Digest::SHA256.hexdigest(digest_hash.to_json) end |
#unlock! ⇒ Object
Delete the job lock.
162 163 164 165 |
# File 'lib/cloudtasker/unique_job/job.rb', line 162 def unlock! locked_id = redis.get(unique_gid) redis.del(unique_gid) if locked_id == id end |