Module: Aws::Rails::SqsActiveJob
- Extended by:
- ActiveSupport::Concern
- Defined in:
- lib/aws/rails/sqs_active_job.rb,
lib/aws/rails/sqs_active_job/poller.rb,
lib/aws/rails/sqs_active_job/executor.rb,
lib/aws/rails/sqs_active_job/job_runner.rb,
lib/aws/rails/sqs_active_job/configuration.rb,
lib/aws/rails/sqs_active_job/deduplication.rb,
lib/aws/rails/sqs_active_job/lambda_handler.rb
Overview
A lambda event handler to run jobs from an SQS queue trigger Trigger the lambda from your SQS queue Configure the entrypoint to: config/environment.Aws::Rails::SqsActiveJob.lambda_job_handler
This will load your Rails environment, and then use this method as the handler.
Defined Under Namespace
Modules: ClassMethods
Classes: Configuration, Executor, Interrupt, JobRunner, Poller
Class Method Summary
collapse
Class Method Details
Returns the (singleton) Configuration.
19
20
21
|
# File 'lib/aws/rails/sqs_active_job.rb', line 19
def self.config
@config ||= Configuration.new
end
|
24
25
26
|
# File 'lib/aws/rails/sqs_active_job.rb', line 24
def self.configure
yield(config)
end
|
.fifo?(queue_url) ⇒ Boolean
28
29
30
|
# File 'lib/aws/rails/sqs_active_job.rb', line 28
def self.fifo?(queue_url)
queue_url.ends_with? '.fifo'
end
|
.lambda_job_handler(event:, context:) ⇒ Object
12
13
14
15
16
17
18
19
20
21
22
23
|
# File 'lib/aws/rails/sqs_active_job/lambda_handler.rb', line 12
def self.lambda_job_handler(event:, context:)
return 'no records to process' unless event['Records']
event['Records'].each do |record|
sqs_msg = to_sqs_msg(record)
job = Aws::Rails::SqsActiveJob::JobRunner.new(sqs_msg)
puts("Running job: #{job.id}[#{job.class_name}]")
job.run
sqs_msg.delete
end
"Processed #{event['Records'].length} jobs."
end
|
.to_message_attributes(record) ⇒ Object
41
42
43
44
45
46
47
48
49
50
51
|
# File 'lib/aws/rails/sqs_active_job/lambda_handler.rb', line 41
def self.to_message_attributes(record)
record['messageAttributes'].transform_values do |value|
{
string_value: value['stringValue'],
binary_value: value['binaryValue'],
string_list_values: ['stringListValues'],
binary_list_values: value['binaryListValues'],
data_type: value['dataType']
}
end
end
|
.to_queue_url(record) ⇒ Object
53
54
55
56
57
58
59
60
|
# File 'lib/aws/rails/sqs_active_job/lambda_handler.rb', line 53
def self.to_queue_url(record)
source_arn = record['eventSourceARN']
raise ArgumentError, "Invalid queue arn: #{source_arn}" unless Aws::ARNParser.arn?(source_arn)
arn = Aws::ARNParser.parse(source_arn)
sfx = Aws::Partitions::EndpointProvider.dns_suffix_for(arn.region)
"https://sqs.#{arn.region}.#{sfx}/#{arn.account_id}/#{arn.resource}"
end
|
.to_sqs_msg(record) ⇒ Object
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
# File 'lib/aws/rails/sqs_active_job/lambda_handler.rb', line 25
def self.to_sqs_msg(record)
msg = Aws::SQS::Types::Message.new(
body: record['body'],
md5_of_body: record['md5OfBody'],
message_attributes: to_message_attributes(record),
message_id: record['messageId'],
receipt_handle: record['receiptHandle']
)
Aws::SQS::Message.new(
queue_url: to_queue_url(record),
receipt_handle: msg.receipt_handle,
data: msg,
client: Aws::Rails::SqsActiveJob.config.client
)
end
|