Class: Jets::Rails::Job::Queue
- Inherits:
-
Object
- Object
- Jets::Rails::Job::Queue
show all
- Extended by:
- Memoist
- Includes:
- Util::Logging
- Defined in:
- lib/jets/rails/job/queue.rb,
lib/jets/rails/job/queue/url.rb,
lib/jets/rails/job/queue/check.rb
Defined Under Namespace
Classes: Check, Error, Url
Instance Attribute Summary collapse
Instance Method Summary
collapse
Constructor Details
#initialize(job, timestamp = nil) ⇒ Queue
Returns a new instance of Queue.
11
12
13
14
|
# File 'lib/jets/rails/job/queue.rb', line 11
def initialize(job, timestamp = nil)
@job = job
@timestamp = timestamp
end
|
Instance Attribute Details
#job ⇒ Object
Returns the value of attribute job.
10
11
12
|
# File 'lib/jets/rails/job/queue.rb', line 10
def job
@job
end
|
#timestamp ⇒ Object
Returns the value of attribute timestamp.
10
11
12
|
# File 'lib/jets/rails/job/queue.rb', line 10
def timestamp
@timestamp
end
|
Instance Method Details
#delay_seconds ⇒ Object
69
70
71
72
73
74
75
|
# File 'lib/jets/rails/job/queue.rb', line 69
def delay_seconds
delay = (@timestamp - Time.now.to_f).floor
delay = 0 if delay.negative?
raise ArgumentError, "Unable to queue a job with a delay greater than 15 minutes" if delay > 15.minutes
delay
end
|
#enqueue ⇒ Object
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
# File 'lib/jets/rails/job/queue.rb', line 16
def enqueue
Check.new(@job).exist!
params = {
queue_url: queue_url,
message_body: JSON.dump(job.serialize)
}
if fifo?
params.merge!(fifo_params)
else
params.merge!(standard_params)
end
log.info "Enqueueing job to #{queue_url}"
log.debug " with params: #{params.inspect}"
sqs.send_message(params)
end
|
#fifo? ⇒ Boolean
33
34
35
|
# File 'lib/jets/rails/job/queue.rb', line 33
def fifo?
queue_url.include?(".fifo")
end
|
#fifo_params ⇒ Object
42
43
44
45
46
47
48
49
50
51
|
# File 'lib/jets/rails/job/queue.rb', line 42
def fifo_params
options = {}
options[:message_deduplication_id] = message_deduplication_id
message_group_id = @job.message_group_id if @job.respond_to?(:message_group_id)
message_group_id ||= "jets"
options[:message_group_id] = message_group_id
options
end
|
#message_deduplication_id ⇒ Object
Note that ‘job_id` is NOT included in deduplication keys because it is unique for each initialization of the job, and the run-once behavior must be guaranteed for ActiveJob retries. Even without setting job_id, it is implicitly excluded from deduplication keys.
56
57
58
59
60
61
62
63
|
# File 'lib/jets/rails/job/queue.rb', line 56
def message_deduplication_id
ex_dedup_keys = @job.excluded_deduplication_keys if @job.respond_to?(:excluded_deduplication_keys)
ex_dedup_keys ||= []
ex_dedup_keys += ["job_id"]
body = @job.serialize
deduplication_body = body.except(*ex_dedup_keys)
Digest::SHA256.hexdigest(JSON.dump(deduplication_body))
end
|
#queue_url ⇒ Object
37
38
39
|
# File 'lib/jets/rails/job/queue.rb', line 37
def queue_url
Url.queue_url(@job.queue_name)
end
|
#sqs ⇒ Object
77
78
79
|
# File 'lib/jets/rails/job/queue.rb', line 77
def sqs
Aws::SQS::Client.new
end
|
#standard_params ⇒ Object
65
66
67
|
# File 'lib/jets/rails/job/queue.rb', line 65
def standard_params
@timestamp ? {delay_seconds: delay_seconds} : {}
end
|