Class: ActiveJob::QueueAdapters::SqsAdapter
- Inherits:
-
Object
- Object
- ActiveJob::QueueAdapters::SqsAdapter
- Defined in:
- lib/active_job/queue_adapters/sqs_adapter.rb,
lib/active_job/queue_adapters/sqs_adapter/params.rb
Overview
Synchronous adapter for Amazon SQS ActiveJob. This adapter queues jobs synchronously (ie blocking).
To use this adapter, set up as:
config.active_job.queue_adapter = :sqs_async
Direct Known Subclasses
Defined Under Namespace
Classes: Params
Instance Method Summary collapse
- #enqueue(job) ⇒ Object
- #enqueue_after_transaction_commit? ⇒ Boolean
- #enqueue_all(jobs) ⇒ Object
- #enqueue_at(job, timestamp) ⇒ Object
Instance Method Details
#enqueue(job) ⇒ Object
18 19 20 |
# File 'lib/active_job/queue_adapters/sqs_adapter.rb', line 18 def enqueue(job) _enqueue(job) end |
#enqueue_after_transaction_commit? ⇒ Boolean
13 14 15 16 |
# File 'lib/active_job/queue_adapters/sqs_adapter.rb', line 13 def enqueue_after_transaction_commit? # can be removed after Rails 8 true end |
#enqueue_all(jobs) ⇒ Object
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/active_job/queue_adapters/sqs_adapter.rb', line 27 def enqueue_all(jobs) enqueued_count = 0 jobs.group_by(&:queue_name).each do |queue_name, same_queue_jobs| queue_url = Aws::ActiveJob::SQS.config.queue_url_for(queue_name) = { queue_url: queue_url } same_queue_jobs.each_slice(10) do |chunk| entries = chunk.map do |job| entry = Params.new(job, nil).entry entry[:id] = job.job_id entry[:delay_seconds] = Params.assured_delay_seconds(job.scheduled_at) if job.scheduled_at entry end = .deep_dup [:entries] = entries = Aws::ActiveJob::SQS.config.client.() enqueued_count += .successful.count end end enqueued_count end |
#enqueue_at(job, timestamp) ⇒ Object
22 23 24 25 |
# File 'lib/active_job/queue_adapters/sqs_adapter.rb', line 22 def enqueue_at(job, ) delay = Params.assured_delay_seconds() _enqueue(job, nil, delay_seconds: delay) end |