Module: Sidekiq::Sqs::Client::ClassMethods

Defined in:
lib/sidekiq-sqs/client.rb

Constant Summary collapse

MAX_BULK_RETRIES =
5

Instance Method Summary collapse

Instance Method Details

#bulk_send_to_sqs(queue, formatted_items) ⇒ Object



110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/sidekiq-sqs/client.rb', line 110

def bulk_send_to_sqs(queue, formatted_items)
  failures = []
  can_retry = []
  formatted_items.each_slice(10) do |items|
    failed, retryable = send_batch_to_sqs(queue, items)

    failures.concat failed
    can_retry.concat retryable
  end

  [failures, can_retry]
end

#clear_queue(queue_name) ⇒ Object



48
49
50
51
52
53
54
55
56
# File 'lib/sidekiq-sqs/client.rb', line 48

def clear_queue(queue_name)
  queue = queue_or_create(environmental_queue_name queue_name)

  while message = queue.receive_message
    message.delete
  end

  queue
end

#environmental_queue_name(queue_name) ⇒ Object



38
39
40
41
42
43
44
45
46
# File 'lib/sidekiq-sqs/client.rb', line 38

def environmental_queue_name(queue_name)
  return queue_name unless defined?(::Rails)

  if queue_name =~ /^#{::Rails.env}_/
    queue_name
  else
    %(#{::Rails.env}_#{queue_name})
  end
end

#format_items(items) ⇒ Object



100
101
102
103
104
105
106
107
108
# File 'lib/sidekiq-sqs/client.rb', line 100

def format_items(items)
  normed = normalize_item(items)
  payloads = items['args'].map do |args|
    _, payload = process_single(items['class'], normed.merge('args' => args, 'jid' => SecureRandom.hex(12)))
    payload
  end.compact

  [normed['queue'], payloads]
end

#process_single_with_sqs(worker_class, item) ⇒ Object



149
150
151
152
153
# File 'lib/sidekiq-sqs/client.rb', line 149

def process_single_with_sqs(worker_class, item)
  item, payload = process_single_without_sqs(worker_class, item)

  return item, Base64.encode64(Zlib::Deflate.deflate(payload))
end

#push(item) ⇒ Object



58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/sidekiq-sqs/client.rb', line 58

def push(item)
  normed = normalize_item(item)
  normed, payload = process_single(item['class'], normed)

  pushed = false
  if normed['at']
    delay_seconds = (normed['at'] - Time.now.to_f).round
    raise "The number of seconds to delay should be from 0 to 900 (15 mins)." if delay_seconds > 900
    pushed = queue_or_create(normed['queue']).send_message(payload, { delay_seconds: delay_seconds })
  else
    pushed = queue_or_create(normed['queue']).send_message(payload)
  end if normed
  pushed ? normed['jid'] : nil
end

#push_bulk(items) ⇒ Object



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/sidekiq-sqs/client.rb', line 74

def push_bulk(items)
  queue_name, payloads = format_items(items)
  queue = queue_or_create(queue_name)

  failures, can_retry = bulk_send_to_sqs(queue, payloads)

  retries = 0
  begin
    if can_retry.size > 0
      failed, can_retry = bulk_send_to_sqs(queue, can_retry)
      failures.concat failed

      raise Retryable if can_retry.size > 0
    end
  rescue Retryable
    sleep retries ** 2
    retry if (retries += 1) < MAX_BULK_RETRIES
  end

  if failures.size > 0
    raise BulkInsertionError.new("Some messages failed to insert", failed)
  end

  failures.empty? ? payloads.size : nil
end

#queue_or_create(queue) ⇒ Object



140
141
142
143
144
145
146
147
# File 'lib/sidekiq-sqs/client.rb', line 140

def queue_or_create(queue)
  queue = environmental_queue_name(queue)
  begin
    Sidekiq.sqs.queues.named(queue.to_s)
  rescue AWS::SQS::Errors::NonExistentQueue
    Sidekiq.sqs.queues.create(queue.to_s)
  end
end

#send_batch_to_sqs(queue, formatted_items) ⇒ Object



123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/sidekiq-sqs/client.rb', line 123

def send_batch_to_sqs(queue, formatted_items)
  failures, retryables = [], []

  begin
    queue.batch_send(formatted_items)
  rescue AWS::SQS::Errors::BatchSendError => error
    retryable, failed = error.failures.partition do |failure|
      RETRYABLE_ERRORS.include?(failure[:error_code])
    end

    failures.concat failed
    retryables.concat retryable
  end

  [failures, retryables]
end