Module: SideJob::JobMethods

Included in:
Job
Defined in:
lib/sidejob/job.rb

Overview

Methods shared between Job and Worker.

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#idObject (readonly)

Returns the value of attribute id.



4
5
6
# File 'lib/sidejob/job.rb', line 4

def id
  @id
end

Instance Method Details

#==(other) ⇒ Boolean

Returns True if two jobs or workers have the same id.

Returns:

  • (Boolean)

    True if two jobs or workers have the same id



7
8
9
# File 'lib/sidejob/job.rb', line 7

def ==(other)
  other.respond_to?(:id) && id == other.id
end

#add_alias(name) ⇒ Object

Add an alias for the job.

Parameters:

  • name (String)

    Alias for the job. Must begin with an alphabetic character.

Raises:

  • (RuntimeError)

    Error if name is invalid or the name already refers to another job



61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/sidejob/job.rb', line 61

def add_alias(name)
  check_exists
  raise "#{name} is not a valid alias" unless name =~ /^[[:alpha:]]/
  current = SideJob.redis.hget('jobs:aliases', name)
  if current
    raise "#{name} is already used by job #{current}"  if current.to_i != id
  else
    SideJob.redis.multi do |multi|
      multi.hset 'jobs:aliases', name, id
      multi.sadd "#{redis_key}:aliases", name
    end
  end
end

#adopt(orphan, name) ⇒ Object

Adopt a parent-less job as a child of this job.

Parameters:

  • orphan (SideJob::Job)

    Job that has no parent

  • name (String)

    Name of child job (must be unique among children)



198
199
200
201
202
203
204
205
206
207
208
# File 'lib/sidejob/job.rb', line 198

def adopt(orphan, name)
  check_exists
  raise "Job #{id} cannot adopt itself as a child" if orphan == self
  raise "Job #{id} cannot adopt job #{orphan.id} as it already has a parent" unless orphan.parent.nil?
  raise "Job #{id} cannot adopt job #{orphan.id} as child name #{name} is not unique" if name.nil? || ! child(name).nil?

  SideJob.redis.multi do |multi|
    multi.set "#{orphan.redis_key}:parent", id.to_json
    multi.hset "#{redis_key}:children", name, orphan.id
  end
end

#aliasesArray<String>

Returns all aliases for the job.

Returns:

  • (Array<String>)

    Job aliases



54
55
56
# File 'lib/sidejob/job.rb', line 54

def aliases
  SideJob.redis.smembers "#{redis_key}:aliases"
end

#child(name) ⇒ SideJob::Job?

Returns a child job by name.

Parameters:

  • name (Symbol, String)

    Child job name to look up

Returns:



160
161
162
# File 'lib/sidejob/job.rb', line 160

def child(name)
  SideJob.find(SideJob.redis.hget("#{redis_key}:children", name))
end

#childrenHash<String => SideJob::Job>

Returns all children jobs.

Returns:



166
167
168
# File 'lib/sidejob/job.rb', line 166

def children
  SideJob.redis.hgetall("#{redis_key}:children").each_with_object({}) {|child, hash| hash[child[0]] = SideJob.find(child[1])}
end

#deleteBoolean

Deletes the job and all children jobs (recursively) if all are terminated.

Returns:

  • (Boolean)

    Whether the job was deleted



212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
# File 'lib/sidejob/job.rb', line 212

def delete
  return false unless terminated?

  parent = self.parent
  parent.disown(self) if parent

  children = self.children
  aliases = self.aliases

  # delete all SideJob keys and disown all children
  ports = inports.map(&:redis_key) + outports.map(&:redis_key)
  SideJob.redis.multi do |multi|
    multi.srem 'jobs', id
    multi.del redis_key
    multi.del ports + %w{worker status state aliases parent children inports outports inports:default outports:default inports:channels outports:channels created_at created_by ran_at}.map {|x| "#{redis_key}:#{x}" }
    children.each_value { |child| multi.hdel child.redis_key, 'parent' }
    aliases.each { |name| multi.hdel('jobs:aliases', name) }
  end

  # recursively delete all children
  children.each_value do |child|
    child.delete
  end

  publish({deleted: true})
  return true
end

#disown(name_or_job) ⇒ Object

Disown a child job so that it no longer has a parent.

Parameters:

  • name_or_job (String, SideJob::Job)

    Name or child job to disown



178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# File 'lib/sidejob/job.rb', line 178

def disown(name_or_job)
  if name_or_job.is_a?(SideJob::Job)
    job = name_or_job
    name = children.rassoc(job)
    raise "Job #{id} cannot disown job #{job.id} as it is not a child" unless name
  else
    name = name_or_job
    job = child(name)
    raise "Job #{id} cannot disown non-existent child #{name}" unless job
  end

  SideJob.redis.multi do |multi|
    multi.del "#{job.redis_key}:parent"
    multi.hdel "#{redis_key}:children", name
  end
end

#eql?(other) ⇒ Boolean

Returns:

  • (Boolean)

See Also:



12
13
14
# File 'lib/sidejob/job.rb', line 12

def eql?(other)
  self == other
end

#exists?Boolean

Returns if the job still exists.

Returns:

  • (Boolean)

    Returns true if this job exists and has not been deleted



29
30
31
# File 'lib/sidejob/job.rb', line 29

def exists?
  SideJob.redis.sismember 'jobs', id
end

#get(key) ⇒ Object?

Returns some data from the job’s internal state.

Parameters:

  • key (Symbol, String)

    Retrieve value for the given key

Returns:

  • (Object, nil)

    Value from the job state or nil if key does not exist



312
313
314
315
316
# File 'lib/sidejob/job.rb', line 312

def get(key)
  check_exists
  val = SideJob.redis.hget("#{redis_key}:state", key)
  val ? JSON.parse("[#{val}]")[0] : nil
end

#hashFixnum

Returns Hash value based on the id.

Returns:

  • (Fixnum)

    Hash value based on the id



17
18
19
# File 'lib/sidejob/job.rb', line 17

def hash
  id.hash
end

#infoHash

Returns

Returns:

  • (Hash)


284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
# File 'lib/sidejob/job.rb', line 284

def info
  check_exists
  data = SideJob.redis.multi do |multi|
    multi.get "#{redis_key}:worker"
    multi.get "#{redis_key}:created_by"
    multi.get "#{redis_key}:created_at"
    multi.get "#{redis_key}:ran_at"
  end

  worker = JSON.parse(data[0])
  {
      queue: worker['queue'], class: worker['class'], args: worker['args'],
      created_by: data[1], created_at: data[2], ran_at: data[3],
  }
end

#inportsArray<SideJob::Port>

Gets all input ports.

Returns:



256
257
258
# File 'lib/sidejob/job.rb', line 256

def inports
  all_ports :in
end

#inports=(ports) ⇒ Object

Sets the input ports for the job. The ports are merged with the worker configuration. Any current ports that are not in the new port set are deleted (including any data on those ports).

Parameters:

  • ports (Hash{Symbol,String => Hash})

    Input port configuration. Port name to options.



264
265
266
# File 'lib/sidejob/job.rb', line 264

def inports=(ports)
  set_ports :in, ports
end

#input(name) ⇒ SideJob::Port

Returns an input port.

Parameters:

  • name (Symbol, String)

    Name of the port

Returns:



243
244
245
# File 'lib/sidejob/job.rb', line 243

def input(name)
  SideJob::Port.new(self, :in, name)
end

#lock(ttl, retries: 3, retry_delay: 0.2) ⇒ String?

Acquire a lock on the job with a given expiration time.

Parameters:

  • ttl (Fixnum)

    Lock expiration in seconds

  • retries (Fixnum) (defaults to: 3)

    Number of attempts to retry getting lock

  • retry_delay (Float) (defaults to: 0.2)

    Maximum seconds to wait (actual will be randomized) before retry getting lock

Returns:

  • (String, nil)

    Lock token that should be passed to #unlock or nil if lock was not acquired



340
341
342
343
344
345
346
347
348
349
350
351
# File 'lib/sidejob/job.rb', line 340

def lock(ttl, retries: 3, retry_delay: 0.2)
  check_exists
  retries.times do
    token = SecureRandom.uuid
    if SideJob.redis.set("#{redis_key}:lock", token, {nx: true, ex: ttl})
      return token # lock acquired
    else
      sleep Random.rand(retry_delay)
    end
  end
  return nil # lock not acquired
end

#outportsArray<SideJob::Port>

Gets all output ports.

Returns:



270
271
272
# File 'lib/sidejob/job.rb', line 270

def outports
  all_ports :out
end

#outports=(ports) ⇒ Object

Sets the input ports for the job. The ports are merged with the worker configuration. Any current ports that are not in the new port set are deleted (including any data on those ports).

Parameters:

  • ports (Hash{Symbol,String => Hash})

    Output port configuration. Port name to options.



278
279
280
# File 'lib/sidejob/job.rb', line 278

def outports=(ports)
  set_ports :out, ports
end

#output(name) ⇒ SideJob::Port

Returns an output port

Parameters:

  • name (Symbol, String)

    Name of the port

Returns:



250
251
252
# File 'lib/sidejob/job.rb', line 250

def output(name)
  SideJob::Port.new(self, :out, name)
end

#parentSideJob::Job?

Returns the parent job.

Returns:



172
173
174
# File 'lib/sidejob/job.rb', line 172

def parent
  SideJob.find(SideJob.redis.get("#{redis_key}:parent"))
end

#publish(message) ⇒ Object

Publishes a message to the job’s channel.

Parameters:

  • message (Object)

    JSON encodable message



376
377
378
# File 'lib/sidejob/job.rb', line 376

def publish(message)
  SideJob.publish "/sidejob/job/#{id}", message
end

#queue(queue, klass, **options) ⇒ Object

Queues a child job, setting parent and by to self.

See Also:



152
153
154
155
# File 'lib/sidejob/job.rb', line 152

def queue(queue, klass, **options)
  check_exists
  SideJob.queue(queue, klass, options.merge({parent: self, by: "job:#{id}"}))
end

#redis_keyString Also known as: to_s

Returns Prefix for all redis keys related to this job.

Returns:

  • (String)

    Prefix for all redis keys related to this job



22
23
24
# File 'lib/sidejob/job.rb', line 22

def redis_key
  "job:#{id}"
end

#refresh_lock(ttl) ⇒ Boolean

Refresh the lock expiration.

Parameters:

  • ttl (Fixnum)

    Refresh lock expiration for the given time in seconds

Returns:

  • (Boolean)

    Whether the timeout was set



356
357
358
359
# File 'lib/sidejob/job.rb', line 356

def refresh_lock(ttl)
  check_exists
  SideJob.redis.expire "#{redis_key}:lock", ttl
end

#remove_alias(name) ⇒ Object

Remove an alias for the job.

Parameters:

  • name (String)

    Alias to remove for the job

Raises:

  • (RuntimeError)

    Error if name is not an alias for this job



78
79
80
81
82
83
84
85
# File 'lib/sidejob/job.rb', line 78

def remove_alias(name)
  check_exists
  raise "#{name} is not an alias for job #{id}" unless SideJob.redis.sismember("#{redis_key}:aliases", name)
  SideJob.redis.multi do |multi|
    multi.hdel 'jobs:aliases', name
    multi.srem "#{redis_key}:aliases", name
  end
end

#run(parent: false, force: false, at: nil, wait: nil) ⇒ SideJob::Job?

Run the job. This method ensures that the job runs at least once from the beginning. If the job is currently running, it will run again. Just like sidekiq, we make no guarantees that the job will not be run more than once. Unless force is set, the job will only be run if the status is running, queued, suspended, or completed.

Parameters:

  • parent (Boolean) (defaults to: false)

    Whether to run parent job instead of this one

  • force (Boolean) (defaults to: false)

    Whether to run if job is terminated (default false)

  • at (Time, Float) (defaults to: nil)

    Time to schedule the job, otherwise queue immediately

  • wait (Float) (defaults to: nil)

    Run in the specified number of seconds

Returns:

  • (SideJob::Job, nil)

    The job that was run or nil if no job was run



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/sidejob/job.rb', line 97

def run(parent: false, force: false, at: nil, wait: nil)
  if parent
    pj = self.parent
    return pj ? pj.run(force: force, at: at, wait: wait) : nil
  end

  return nil unless force || %w{running queued suspended completed}.include?(status)

  self.status = 'queued'

  time = nil
  if at
    time = at
    time = time.to_f if time.is_a?(Time)
  elsif wait
    time = Time.now.to_f + wait
  end
  sidekiq_queue(time)

  self
end

#set(data) ⇒ Object

Sets values in the job’s internal state.

Parameters:

  • data (Hash{String,Symbol => Object})

    Data to update: objects should be JSON encodable

Raises:

  • (RuntimeError)

    Error raised if job no longer exists



321
322
323
324
325
# File 'lib/sidejob/job.rb', line 321

def set(data)
  check_exists
  return unless data.size > 0
  SideJob.redis.hmset "#{redis_key}:state", *(data.map {|k,v| [k, v.to_json]}.flatten)
end

#stateHash{String => Object}

Returns the entirety of the job’s internal state.

Returns:

  • (Hash{String => Object})

    Job internal state



302
303
304
305
306
307
# File 'lib/sidejob/job.rb', line 302

def state
  check_exists
  state = SideJob.redis.hgetall("#{redis_key}:state")
  state.update(state) {|k,v| JSON.parse("[#{v}]")[0]}
  state
end

#statusString

Retrieve the job’s status.

Returns:

  • (String)

    Job status



35
36
37
38
# File 'lib/sidejob/job.rb', line 35

def status
  check_exists
  SideJob.redis.get "#{redis_key}:status"
end

#status=(status) ⇒ Object

Set the job status.

Parameters:

  • status (String)

    The new job status



42
43
44
45
46
47
48
49
50
# File 'lib/sidejob/job.rb', line 42

def status=(status)
  check_exists
  oldstatus = SideJob.redis.getset("#{redis_key}:status", status)
  if oldstatus != status && worker_config['status_publish'] != false
    SideJob::Port.group(log: false) do
      publish({status: status})
    end
  end
end

#terminate(recursive: false) ⇒ SideJob::Job

Prepare to terminate the job. Sets status to ‘terminating’. Then queues the job so that its shutdown method if it exists can be run. After shutdown, the status will be ‘terminated’. If the job is currently running, it will finish running first. If the job is already terminated, it does nothing. To start the job after termination, call #run with force: true.

Parameters:

  • recursive (Boolean) (defaults to: false)

    If true, recursively terminate all children (default false)

Returns:



137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/sidejob/job.rb', line 137

def terminate(recursive: false)
  if status != 'terminated'
    self.status = 'terminating'
    sidekiq_queue
  end
  if recursive
    children.each_value do |child|
      child.terminate(recursive: true)
    end
  end
  self
end

#terminated?Boolean

Returns if job and all children are terminated.

Returns:

  • (Boolean)

    True if this job and all children recursively are terminated



121
122
123
124
125
126
127
# File 'lib/sidejob/job.rb', line 121

def terminated?
  return false if status != 'terminated'
  children.each_value do |child|
    return false unless child.terminated?
  end
  return true
end

#unlock(token) ⇒ Boolean

Unlock job by deleting the lock only if it equals the lock token.

Parameters:

  • token (String)

    Token returned by #lock

Returns:

  • (Boolean)

    Whether the job was unlocked



364
365
366
367
368
369
370
371
372
# File 'lib/sidejob/job.rb', line 364

def unlock(token)
  check_exists
  return SideJob.redis.eval('
    if redis.call("get",KEYS[1]) == ARGV[1] then
      return redis.call("del",KEYS[1])
    else
      return 0
    end', { keys: ["#{redis_key}:lock"], argv: [token] }) == 1
end

#unset(*fields) ⇒ Object

Unsets some fields in the job’s internal state.

Parameters:

  • fields (Array<String,Symbol>)

    Fields to unset

Raises:

  • (RuntimeError)

    Error raised if job no longer exists



330
331
332
333
# File 'lib/sidejob/job.rb', line 330

def unset(*fields)
  return unless fields.length > 0
  SideJob.redis.hdel "#{redis_key}:state", fields
end