Class: Funktor::Job

Inherits:
Object
  • Object
show all
Includes:
ShardUtils
Defined in:
lib/funktor/job.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from ShardUtils

#calculate_shard

Constructor Details

#initialize(job_string) ⇒ Job

Returns a new instance of Job.



8
9
10
# File 'lib/funktor/job.rb', line 8

def initialize(job_string)
  @job_string = job_string
end

Instance Attribute Details

#job_dataObject

Returns the value of attribute job_data.



7
8
9
# File 'lib/funktor/job.rb', line 7

def job_data
  @job_data
end

#job_stringObject

Returns the value of attribute job_string.



6
7
8
# File 'lib/funktor/job.rb', line 6

def job_string
  @job_string
end

Instance Method Details

#can_retryObject



138
139
140
# File 'lib/funktor/job.rb', line 138

def can_retry
  self.retries < retry_limit
end

#default_seconds_to_delay(count) ⇒ Object

delayed_job and sidekiq use the same basic formula



126
127
128
# File 'lib/funktor/job.rb', line 126

def default_seconds_to_delay(count)
  (count**4) + 15 + (rand(30) * (count + 1))
end

#delayObject



72
73
74
75
76
77
78
# File 'lib/funktor/job.rb', line 72

def delay
  delay = (perform_at - Time.now.utc).to_i
  if delay < 0
    delay = 0
  end
  return delay
end

#delay=(delay) ⇒ Object



80
81
82
# File 'lib/funktor/job.rb', line 80

def delay=(delay)
  job_data["perform_at"] = Time.now.utc + delay
end

#error=(error) ⇒ Object



96
97
98
99
100
101
# File 'lib/funktor/job.rb', line 96

def error=(error)
  # TODO We should maybe compress this?
  job_data["error_class"] = error.class.name
  job_data["error_message"] = error.message
  job_data["error_backtrace"] = Funktor.dump_json(error.backtrace)
end

#error_backtraceObject



92
93
94
# File 'lib/funktor/job.rb', line 92

def error_backtrace
  job_data["error_backtrace"].present? ? Funktor.parse_json(job_data["error_backtrace"]) : []
end

#error_classObject



84
85
86
# File 'lib/funktor/job.rb', line 84

def error_class
  job_data["error_class"]
end

#error_messageObject



88
89
90
# File 'lib/funktor/job.rb', line 88

def error_message
  job_data["error_message"]
end

#executeObject



103
104
105
# File 'lib/funktor/job.rb', line 103

def execute
  worker_class.new.perform(*worker_params)
end

#increment_retriesObject



111
112
113
114
115
# File 'lib/funktor/job.rb', line 111

def increment_retries
  self.retries ||= 0
  self.retries += 1
  self.delay = seconds_to_delay(retries)
end

#is_retry?Boolean

Returns:

  • (Boolean)


56
57
58
# File 'lib/funktor/job.rb', line 56

def is_retry?
  job_data["retries"].present?
end

#job_idObject



40
41
42
# File 'lib/funktor/job.rb', line 40

def job_id
  job_data["job_id"]
end

#perform_atObject



64
65
66
67
68
69
70
# File 'lib/funktor/job.rb', line 64

def perform_at
  if job_data["perform_at"].present?
    job_data["perform_at"].is_a?(Time) ? job_data["perform_at"] : Time.parse(job_data["perform_at"])
  else
    Time.now.utc
  end
end

#queueObject



16
17
18
# File 'lib/funktor/job.rb', line 16

def queue
  job_data["queue"] || 'default'
end

#retriesObject



52
53
54
# File 'lib/funktor/job.rb', line 52

def retries
  job_data["retries"] || 0
end

#retries=(retries) ⇒ Object



60
61
62
# File 'lib/funktor/job.rb', line 60

def retries=(retries)
  job_data["retries"] = retries
end

#retry_limitObject



134
135
136
# File 'lib/funktor/job.rb', line 134

def retry_limit
  worker_class&.custom_retry_limit || 25
end

#retry_queue_urlObject



142
143
144
145
146
147
148
# File 'lib/funktor/job.rb', line 142

def retry_queue_url
  worker_class&.custom_queue_url || ENV['FUNKTOR_INCOMING_JOB_QUEUE']
rescue NameError, TypeError
  # In the web ui we may not have access to the the worker classes
  # TODO : We should mayb handle this differently somehow? This just feels a bit icky...
  ENV['FUNKTOR_INCOMING_JOB_QUEUE']
end

#seconds_to_delay(count) ⇒ Object



117
118
119
120
121
122
123
# File 'lib/funktor/job.rb', line 117

def seconds_to_delay(count)
  if worker_class&.funktor_retry_in_block
    worker_class.funktor_retry_in_block.call(count)
  else
    default_seconds_to_delay(count)
  end
end

#shardObject



44
45
46
# File 'lib/funktor/job.rb', line 44

def shard
  calculate_shard(job_data["job_id"])
end

#to_json(arg = nil) ⇒ Object



130
131
132
# File 'lib/funktor/job.rb', line 130

def to_json(arg = nil)
  Funktor.dump_json(job_data)
end

#work_queue_urlObject



20
21
22
23
24
25
26
# File 'lib/funktor/job.rb', line 20

def work_queue_url
  queue_name = self.queue
  queue_constant = "FUNKTOR_#{queue_name.underscore.upcase}_QUEUE"
  Funktor.logger.debug "queue_constant = #{queue_constant}"
  Funktor.logger.debug "ENV value = #{ENV[queue_constant]}"
  ENV[queue_constant] || ENV['FUNKTOR_DEFAULT_QUEUE']
end

#worker_classObject



107
108
109
# File 'lib/funktor/job.rb', line 107

def worker_class
  @klass ||= Object.const_get worker_class_name
end

#worker_class_nameObject



36
37
38
# File 'lib/funktor/job.rb', line 36

def worker_class_name
  job_data["worker"]
end

#worker_class_name_for_metricsObject



28
29
30
31
32
33
34
# File 'lib/funktor/job.rb', line 28

def worker_class_name_for_metrics
  if job_data["worker"] == "ActiveJob::QueueAdapters::FunktorAdapter::JobWrapper"
    job_data["wrapped"]
  else
    job_data["worker"]
  end
end

#worker_paramsObject



48
49
50
# File 'lib/funktor/job.rb', line 48

def worker_params
  job_data["worker_params"]
end