Class: Announce::Adapters::ShoryukenAdapter::Queue

Inherits:
BaseAdapter::Queue show all
Defined in:
lib/announce/adapters/shoryuken_adapter.rb

Constant Summary collapse

DLQ_SUFFIX =
"failures".freeze

Instance Attribute Summary

Attributes inherited from BaseAdapter::Destination

#action, #options, #subject

Instance Method Summary collapse

Methods inherited from BaseAdapter::Queue

name_for

Methods inherited from BaseAdapter::Destination

app, delimiter, #initialize, #name, name_for, namespace, prefix, #publish

Constructor Details

This class inherits a constructor from Announce::Adapters::BaseAdapter::Destination

Instance Method Details

#arnObject



160
161
162
163
164
# File 'lib/announce/adapters/shoryuken_adapter.rb', line 160

def arn
   = Shoryuken::Client.
  region = sqs.config[:region]
  "arn:aws:sqs:#{region}:#{}:#{name}"
end

#createObject



141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/announce/adapters/shoryuken_adapter.rb', line 141

def create
  dlq_arn = create_dlq

  create_attributes =
    default_options.merge((options[:queues] || {}).stringify_keys)
  create_attributes["RedrivePolicy"] =
    '{"maxReceiveCount":"10", "deadLetterTargetArn":"' + dlq_arn + '"}'

  sqs.create_queue(queue_name: name, attributes: create_attributes)[
    :queue_url
  ]
end

#create_dlqObject



166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/announce/adapters/shoryuken_adapter.rb', line 166

def create_dlq
  dlq_options = {
    "MaximumMessageSize" => (256 * 1024).to_s,
    "MessageRetentionPeriod" => (2 * 7 * 24 * 60 * 60).to_s
    # 2 weeks in seconds
  }

  dlq = sqs.create_queue(queue_name: dlq_name, attributes: dlq_options)

  attrs =
    sqs.get_queue_attributes(
      queue_url: dlq[:queue_url], attribute_names: %w[QueueArn]
    )

  attrs.attributes["QueueArn"]
end

#default_optionsObject



191
192
193
194
195
196
197
198
199
200
201
202
# File 'lib/announce/adapters/shoryuken_adapter.rb', line 191

def default_options
  {
    "DelaySeconds" => "0",
    "MaximumMessageSize" => (256 * 1024).to_s,
    "VisibilityTimeout" => (60 * 60).to_s,
    # 1 hour in seconds
    "ReceiveMessageWaitTimeSeconds" => "0",
    "MessageRetentionPeriod" => (7 * 24 * 60 * 60).to_s,
    # 1 week in seconds
    "Policy" => policy
  }
end

#dlq_arnObject



183
184
185
# File 'lib/announce/adapters/shoryuken_adapter.rb', line 183

def dlq_arn
  [arn, DLQ_SUFFIX].join(self.class.delimiter)
end

#dlq_nameObject



187
188
189
# File 'lib/announce/adapters/shoryuken_adapter.rb', line 187

def dlq_name
  [name, DLQ_SUFFIX].join(self.class.delimiter)
end

#policyObject



204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
# File 'lib/announce/adapters/shoryuken_adapter.rb', line 204

def policy
   = Shoryuken::Client.
  region = sqs.config[:region]
  {
    "Version" => "2012-10-17",
    "Id" => "AnnounceSNStoSQS",
    "Statement" => [
      {
        "Sid" => "1",
        "Effect" => "Allow",
        "Principal" => { "AWS" => "*" },
        "Action" => "sqs:*",
        "Resource" => arn,
        "Condition" => {
          "ArnLike" => {
            "aws:SourceArn" => "arn:aws:sns:#{region}:#{}:*"
          }
        }
      }
    ]
  }.to_json
end

#sqsObject



227
228
229
# File 'lib/announce/adapters/shoryuken_adapter.rb', line 227

def sqs
  Shoryuken::Client.sqs
end

#verifyObject



154
155
156
157
158
# File 'lib/announce/adapters/shoryuken_adapter.rb', line 154

def verify
  Announce.logger.warn(
    "Verify SQS Queue: #{arn}\n\t with DLQ: #{dlq_arn}"
  )
end