Module: Aws::Rails::SqsActiveJob

Extended by:
ActiveSupport::Concern
Defined in:
lib/aws/rails/sqs_active_job.rb,
lib/aws/rails/sqs_active_job/poller.rb,
lib/aws/rails/sqs_active_job/executor.rb,
lib/aws/rails/sqs_active_job/job_runner.rb,
lib/aws/rails/sqs_active_job/configuration.rb,
lib/aws/rails/sqs_active_job/deduplication.rb,
lib/aws/rails/sqs_active_job/lambda_handler.rb

Overview

A lambda event handler to run jobs from an SQS queue trigger Trigger the lambda from your SQS queue Configure the entrypoint to: config/environment.Aws::Rails::SqsActiveJob.lambda_job_handler This will load your Rails environment, and then use this method as the handler.

Defined Under Namespace

Modules: ClassMethods Classes: Configuration, Executor, Interrupt, JobRunner, Poller

Class Method Summary collapse

Class Method Details

.configConfiguration

Returns the (singleton) Configuration.

Returns:



19
20
21
# File 'lib/aws/rails/sqs_active_job.rb', line 19

def self.config
  @config ||= Configuration.new
end

.configure { ... } ⇒ Object

Yields:

  • Configuration



24
25
26
# File 'lib/aws/rails/sqs_active_job.rb', line 24

def self.configure
  yield(config)
end

.fifo?(queue_url) ⇒ Boolean

Returns:

  • (Boolean)


28
29
30
# File 'lib/aws/rails/sqs_active_job.rb', line 28

def self.fifo?(queue_url)
  queue_url.ends_with? '.fifo'
end

.lambda_job_handler(event:, context:) ⇒ Object



12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/aws/rails/sqs_active_job/lambda_handler.rb', line 12

def self.lambda_job_handler(event:, context:)
  return 'no records to process' unless event['Records']

  event['Records'].each do |record|
    sqs_msg = to_sqs_msg(record)
    job = Aws::Rails::SqsActiveJob::JobRunner.new(sqs_msg)
    puts("Running job: #{job.id}[#{job.class_name}]")
    job.run
    sqs_msg.delete
  end
  "Processed #{event['Records'].length} jobs."
end

.to_message_attributes(record) ⇒ Object



41
42
43
44
45
46
47
48
49
50
51
# File 'lib/aws/rails/sqs_active_job/lambda_handler.rb', line 41

def self.to_message_attributes(record)
  record['messageAttributes'].transform_values do |value|
    {
      string_value: value['stringValue'],
      binary_value: value['binaryValue'],
      string_list_values: ['stringListValues'],
      binary_list_values: value['binaryListValues'],
      data_type: value['dataType']
    }
  end
end

.to_queue_url(record) ⇒ Object

Raises:

  • (ArgumentError)


53
54
55
56
57
58
59
60
# File 'lib/aws/rails/sqs_active_job/lambda_handler.rb', line 53

def self.to_queue_url(record)
  source_arn = record['eventSourceARN']
  raise ArgumentError, "Invalid queue arn: #{source_arn}" unless Aws::ARNParser.arn?(source_arn)

  arn = Aws::ARNParser.parse(source_arn)
  sfx = Aws::Partitions::EndpointProvider.dns_suffix_for(arn.region)
  "https://sqs.#{arn.region}.#{sfx}/#{arn.}/#{arn.resource}"
end

.to_sqs_msg(record) ⇒ Object



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/aws/rails/sqs_active_job/lambda_handler.rb', line 25

def self.to_sqs_msg(record)
  msg = Aws::SQS::Types::Message.new(
    body: record['body'],
    md5_of_body: record['md5OfBody'],
    message_attributes: to_message_attributes(record),
    message_id: record['messageId'],
    receipt_handle: record['receiptHandle']
  )
  Aws::SQS::Message.new(
    queue_url: to_queue_url(record),
    receipt_handle: msg.receipt_handle,
    data: msg,
    client: Aws::Rails::SqsActiveJob.config.client
  )
end