Class: Cloudtasker::UniqueJob::Job

Inherits:
Object
  • Object
show all
Defined in:
lib/cloudtasker/unique_job/job.rb

Overview

Wrapper class for Cloudtasker::Worker delegating to lock and conflict strategies

Constant Summary collapse

DEFAULT_LOCK =

The default lock strategy to use. Defaults to “no lock”.

UniqueJob::Lock::NoOp

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(worker, opts = {}) ⇒ Job

Build a new instance of the class.

Parameters:



19
20
21
22
# File 'lib/cloudtasker/unique_job/job.rb', line 19

def initialize(worker, opts = {})
  @worker = worker
  @call_opts = opts
end

Instance Attribute Details

#call_optsObject (readonly)

Returns the value of attribute call_opts.



8
9
10
# File 'lib/cloudtasker/unique_job/job.rb', line 8

def call_opts
  @call_opts
end

#workerObject (readonly)

Returns the value of attribute worker.



8
9
10
# File 'lib/cloudtasker/unique_job/job.rb', line 8

def worker
  @worker
end

Instance Method Details

#digest_hashHash

Return a unique description of the job in hash format.

Returns:

  • (Hash)

    Representation of the unique job in hash format.



101
102
103
104
105
106
# File 'lib/cloudtasker/unique_job/job.rb', line 101

def digest_hash
  @digest_hash ||= {
    class: worker.class.to_s,
    unique_args: unique_args
  }
end

#idString

Return the worker job ID.

Returns:

  • (String)

    The worker job ID.



113
114
115
# File 'lib/cloudtasker/unique_job/job.rb', line 113

def id
  worker.job_id
end

#lock!Object

Acquire a new unique job lock or check that the lock is currently allocated to this job.

Raise a ‘Cloudtasker::UniqueJob::LockError` if the lock if taken by another job.

Raises:



152
153
154
155
156
157
# File 'lib/cloudtasker/unique_job/job.rb', line 152

def lock!
  lock_acquired = redis.set(unique_gid, id, nx: true, ex: lock_ttl)
  lock_already_acquired = !lock_acquired && redis.get(unique_gid) == id

  raise(LockError) unless lock_acquired || lock_already_acquired
end

#lock_instanceAny

Return the instantiated lock.

Returns:

  • (Any)

    The instantiated lock



75
76
77
78
79
80
81
82
83
84
85
# File 'lib/cloudtasker/unique_job/job.rb', line 75

def lock_instance
  @lock_instance ||=
    begin
      # Infer lock class and get instance
      lock_name = options[:lock]
      lock_klass = Lock.const_get(lock_name.to_s.split('_').collect(&:capitalize).join)
      lock_klass.new(self)
    rescue NameError
      DEFAULT_LOCK.new(self)
    end
end

#lock_ttlInteger

Return the Time To Live (TTL) that should be set in Redis for the lock key. Having a TTL on lock keys ensures that jobs do not end up stuck due to a dead lock situation.

The TTL is calculated using schedule time + expected max job duration.

The expected max job duration is set to 10 minutes by default. This value was chosen because it’s twice the default request timeout value in Cloud Run. This leaves enough room for queue lag (5 minutes) + job processing (5 minutes).

Queue lag is certainly the most unpredictable factor here. Job processing time is less of a factor. Jobs running for more than 5 minutes should be split into sub-jobs to limit invocation time over HTTP. Cloudtasker batch jobs can help achieve that if you need to make one big job split into sub-jobs “atomic”.

The default lock key expiration of “time_at + 10 minutes” may look aggressive but it is still a better choice than potentially having real-time jobs stuck for X hours.

The expected max job duration can be configured via the ‘lock_ttl` option on the job itself.

Returns:

  • (Integer)

    The TTL in seconds



59
60
61
62
63
64
65
66
67
68
# File 'lib/cloudtasker/unique_job/job.rb', line 59

def lock_ttl
  now = Time.now.to_i

  # Get scheduled at and lock duration
  scheduled_at = [call_opts[:time_at].to_i, now].compact.max
  lock_duration = (options[:lock_ttl] || Cloudtasker::UniqueJob.lock_ttl).to_i

  # Return TTL
  scheduled_at + lock_duration - now
end

#optionsHash

Return the worker configuration options.

Returns:

  • (Hash)

    The worker configuration options.



29
30
31
# File 'lib/cloudtasker/unique_job/job.rb', line 29

def options
  worker.class.cloudtasker_options_hash
end

#redisCloudtasker::RedisClient

Return the Cloudtasker redis client.

Returns:



141
142
143
# File 'lib/cloudtasker/unique_job/job.rb', line 141

def redis
  @redis ||= Cloudtasker::RedisClient.new
end

#unique_argsArray<any>

Return the list of arguments used for job uniqueness.

Returns:

  • (Array<any>)

    The list of unique arguments



92
93
94
# File 'lib/cloudtasker/unique_job/job.rb', line 92

def unique_args
  worker.try(:unique_args, worker.job_args) || worker.job_args
end

#unique_gidString

Return the Global ID of the unique job. The gid includes the UniqueJob namespace.

Returns:

  • (String)

    The global ID of the job



132
133
134
# File 'lib/cloudtasker/unique_job/job.rb', line 132

def unique_gid
  [self.class.to_s.underscore, unique_id].join('/')
end

#unique_idString

Return the ID of the unique job.

Returns:

  • (String)

    The ID of the job.



122
123
124
# File 'lib/cloudtasker/unique_job/job.rb', line 122

def unique_id
  Digest::SHA256.hexdigest(digest_hash.to_json)
end

#unlock!Object

Delete the job lock.



162
163
164
165
# File 'lib/cloudtasker/unique_job/job.rb', line 162

def unlock!
  locked_id = redis.get(unique_gid)
  redis.del(unique_gid) if locked_id == id
end