Module: Sidekiq::Sqs::Client::ClassMethods
- Defined in:
- lib/sidekiq-sqs/client.rb
Constant Summary collapse
- MAX_BULK_RETRIES =
5
Instance Method Summary collapse
- #bulk_send_to_sqs(queue, formatted_items) ⇒ Object
- #clear_queue(queue_name) ⇒ Object
- #environmental_queue_name(queue_name) ⇒ Object
- #format_items(items) ⇒ Object
- #process_single_with_sqs(worker_class, item) ⇒ Object
- #push(item) ⇒ Object
- #push_bulk(items) ⇒ Object
- #queue_or_create(queue) ⇒ Object
- #send_batch_to_sqs(queue, formatted_items) ⇒ Object
Instance Method Details
#bulk_send_to_sqs(queue, formatted_items) ⇒ Object
110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/sidekiq-sqs/client.rb', line 110 def bulk_send_to_sqs(queue, formatted_items) failures = [] can_retry = [] formatted_items.each_slice(10) do |items| failed, retryable = send_batch_to_sqs(queue, items) failures.concat failed can_retry.concat retryable end [failures, can_retry] end |
#clear_queue(queue_name) ⇒ Object
48 49 50 51 52 53 54 55 56 |
# File 'lib/sidekiq-sqs/client.rb', line 48 def clear_queue(queue_name) queue = queue_or_create(environmental_queue_name queue_name) while = queue. .delete end queue end |
#environmental_queue_name(queue_name) ⇒ Object
38 39 40 41 42 43 44 45 46 |
# File 'lib/sidekiq-sqs/client.rb', line 38 def environmental_queue_name(queue_name) return queue_name unless defined?(::Rails) if queue_name =~ /^#{::Rails.env}_/ queue_name else %(#{::Rails.env}_#{queue_name}) end end |
#format_items(items) ⇒ Object
100 101 102 103 104 105 106 107 108 |
# File 'lib/sidekiq-sqs/client.rb', line 100 def format_items(items) normed = normalize_item(items) payloads = items['args'].map do |args| _, payload = process_single(items['class'], normed.merge('args' => args, 'jid' => SecureRandom.hex(12))) payload end.compact [normed['queue'], payloads] end |
#process_single_with_sqs(worker_class, item) ⇒ Object
149 150 151 152 153 |
# File 'lib/sidekiq-sqs/client.rb', line 149 def process_single_with_sqs(worker_class, item) item, payload = process_single_without_sqs(worker_class, item) return item, Base64.encode64(Zlib::Deflate.deflate(payload)) end |
#push(item) ⇒ Object
58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/sidekiq-sqs/client.rb', line 58 def push(item) normed = normalize_item(item) normed, payload = process_single(item['class'], normed) pushed = false if normed['at'] delay_seconds = (normed['at'] - Time.now.to_f).round raise "The number of seconds to delay should be from 0 to 900 (15 mins)." if delay_seconds > 900 pushed = queue_or_create(normed['queue']).(payload, { delay_seconds: delay_seconds }) else pushed = queue_or_create(normed['queue']).(payload) end if normed pushed ? normed['jid'] : nil end |
#push_bulk(items) ⇒ Object
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/sidekiq-sqs/client.rb', line 74 def push_bulk(items) queue_name, payloads = format_items(items) queue = queue_or_create(queue_name) failures, can_retry = bulk_send_to_sqs(queue, payloads) retries = 0 begin if can_retry.size > 0 failed, can_retry = bulk_send_to_sqs(queue, can_retry) failures.concat failed raise Retryable if can_retry.size > 0 end rescue Retryable sleep retries ** 2 retry if (retries += 1) < MAX_BULK_RETRIES end if failures.size > 0 raise BulkInsertionError.new("Some messages failed to insert", failed) end failures.empty? ? payloads.size : nil end |
#queue_or_create(queue) ⇒ Object
140 141 142 143 144 145 146 147 |
# File 'lib/sidekiq-sqs/client.rb', line 140 def queue_or_create(queue) queue = environmental_queue_name(queue) begin Sidekiq.sqs.queues.named(queue.to_s) rescue AWS::SQS::Errors::NonExistentQueue Sidekiq.sqs.queues.create(queue.to_s) end end |
#send_batch_to_sqs(queue, formatted_items) ⇒ Object
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/sidekiq-sqs/client.rb', line 123 def send_batch_to_sqs(queue, formatted_items) failures, retryables = [], [] begin queue.batch_send(formatted_items) rescue AWS::SQS::Errors::BatchSendError => error retryable, failed = error.failures.partition do |failure| RETRYABLE_ERRORS.include?(failure[:error_code]) end failures.concat failed retryables.concat retryable end [failures, retryables] end |