Class: Jets::Rails::Job::Queue

Inherits:
Object
  • Object
show all
Extended by:
Memoist
Includes:
Util::Logging
Defined in:
lib/jets/rails/job/queue.rb,
lib/jets/rails/job/queue/url.rb,
lib/jets/rails/job/queue/check.rb

Defined Under Namespace

Classes: Check, Error, Url

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(job, timestamp = nil) ⇒ Queue

Returns a new instance of Queue.



11
12
13
14
# File 'lib/jets/rails/job/queue.rb', line 11

def initialize(job, timestamp = nil)
  @job = job
  @timestamp = timestamp
end

Instance Attribute Details

#jobObject (readonly)

Returns the value of attribute job.



10
11
12
# File 'lib/jets/rails/job/queue.rb', line 10

def job
  @job
end

#timestampObject (readonly)

Returns the value of attribute timestamp.



10
11
12
# File 'lib/jets/rails/job/queue.rb', line 10

def timestamp
  @timestamp
end

Instance Method Details

#delay_secondsObject

Raises:

  • (ArgumentError)


69
70
71
72
73
74
75
# File 'lib/jets/rails/job/queue.rb', line 69

def delay_seconds
  delay = (@timestamp - Time.now.to_f).floor
  delay = 0 if delay.negative?
  raise ArgumentError, "Unable to queue a job with a delay greater than 15 minutes" if delay > 15.minutes

  delay
end

#enqueueObject



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/jets/rails/job/queue.rb', line 16

def enqueue
  Check.new(@job).exist!

  params = {
    queue_url: queue_url,
    message_body: JSON.dump(job.serialize)
  }
  if fifo?
    params.merge!(fifo_params)
  else
    params.merge!(standard_params)
  end
  log.info "Enqueueing job to #{queue_url}"
  log.debug "  with params: #{params.inspect}"
  sqs.send_message(params)
end

#fifo?Boolean

Returns:

  • (Boolean)


33
34
35
# File 'lib/jets/rails/job/queue.rb', line 33

def fifo?
  queue_url.include?(".fifo")
end

#fifo_paramsObject



42
43
44
45
46
47
48
49
50
51
# File 'lib/jets/rails/job/queue.rb', line 42

def fifo_params
  options = {}
  options[:message_deduplication_id] = message_deduplication_id

  message_group_id = @job.message_group_id if @job.respond_to?(:message_group_id)
  message_group_id ||= "jets"

  options[:message_group_id] = message_group_id
  options
end

#message_deduplication_idObject

Note that ‘job_id` is NOT included in deduplication keys because it is unique for each initialization of the job, and the run-once behavior must be guaranteed for ActiveJob retries. Even without setting job_id, it is implicitly excluded from deduplication keys.



56
57
58
59
60
61
62
63
# File 'lib/jets/rails/job/queue.rb', line 56

def message_deduplication_id
  ex_dedup_keys = @job.excluded_deduplication_keys if @job.respond_to?(:excluded_deduplication_keys)
  ex_dedup_keys ||= []
  ex_dedup_keys += ["job_id"]
  body = @job.serialize
  deduplication_body = body.except(*ex_dedup_keys)
  Digest::SHA256.hexdigest(JSON.dump(deduplication_body))
end

#queue_urlObject



37
38
39
# File 'lib/jets/rails/job/queue.rb', line 37

def queue_url
  Url.queue_url(@job.queue_name)
end

#sqsObject



77
78
79
# File 'lib/jets/rails/job/queue.rb', line 77

def sqs
  Aws::SQS::Client.new
end

#standard_paramsObject



65
66
67
# File 'lib/jets/rails/job/queue.rb', line 65

def standard_params
  @timestamp ? {delay_seconds: delay_seconds} : {}
end