Class: Aws::ActiveJob::SQS::Configuration

Inherits:
Object
  • Object
show all
Defined in:
lib/aws/active_job/sqs/configuration.rb

Overview

This class provides a Configuration object for AWS ActiveJob by pulling configuration options from runtime code, the ENV, a YAML file, and default settings, in that order. Values set on queues are used preferentially to global values.

Use Aws::ActiveJob::SQS.config to access the singleton config instance and use Aws::ActiveJob::SQS.configure to configure in code:

Aws::ActiveJob::SQS.configure do |config|
  config.logger = Rails.logger
  config.max_messages = 5
end

# Configuation YAML File By default, this class will load configuration from the ‘config/aws_active_job_sqs/<RAILS_ENV}.yml` or `config/aws_active_job_sqs.yml` files. You may specify the file used through the `:config_file` option in code or the `AWS_ACTIVE_JOB_SQS_CONFIG_FILE` environment variable.

# Global and queue specific options Values configured for specific queues are used preferentially to global values. See: QUEUE_CONFIGS for supported queue specific options.

# Environment Variables The Configuration loads global and qubeue specific values from your environment. Global keys take the form of: ‘AWS_ACTIVE_JOB_SQS_<KEY_NAME>` and queue specific keys take the form of: `AWS_ACTIVE_JOB_SQS_<QUEUE_NAME>_<KEY_NAME>`. <QUEUE_NAME> is case-insensitive and is always down cased. Configuring non-snake case queues (containing upper case) through ENV is not supported.

Example:

export AWS_ACTIVE_JOB_SQS_MAX_MESSAGES = 5
export AWS_ACTIVE_JOB_SQS_DEFAULT_URL = https://my-queue.aws

For supported global ENV configurations see GLOBAL_ENV_CONFIGS. For supported queue specific ENV configurations see: QUEUE_ENV_CONFIGS.

Constant Summary collapse

DEFAULTS =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

Default configuration options

{
  threads: 2 * Concurrent.processor_count,
  backpressure: 10,
  max_messages: 10,
  shutdown_timeout: 15,
  queues: Hash.new { |h, k| h[k] = {} },
  message_group_id: 'ActiveJobSqsGroup',
  excluded_deduplication_keys: ['job_id']
}.freeze
GLOBAL_ENV_CONFIGS =
%i[
  config_file
  threads
  backpressure
  max_messages
  shutdown_timeout
  visibility_timeout
  message_group_id
].freeze
QUEUE_ENV_CONFIGS =
%i[
  url
  max_messages
  visibility_timeout
  message_group_id
].freeze
QUEUE_CONFIGS =
QUEUE_ENV_CONFIGS + %i[excluded_deduplication_keys]
QUEUE_KEY_REGEX =
/AWS_ACTIVE_JOB_SQS_([\w]+)_(#{QUEUE_ENV_CONFIGS.map(&:upcase).join('|')})/.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Configuration

Don’t use this method directly: Configuration is a singleton class, use Aws::ActiveJob::SQS.config to access the singleton config instance and use Aws::ActiveJob::SQS.configure to configure in code:

Parameters:

  • options (Hash) (defaults to: {})

Options Hash (options):

  • :queues (Hash<Symbol, Hash>)

    A mapping between the active job queue name and the queue properties. Values configured on the queue are used preferentially to the global values. See: QUEUE_CONFIGS for supported queue specific options. Note: multiple active job queues can map to the same SQS Queue URL.

  • :max_messages (Integer)

    The max number of messages to poll for in a batch.

  • :visibility_timeout (Integer)

    If unset, the visibility timeout configured on the SQS queue will be used. The visibility timeout is the number of seconds that a message will not be processable by any other consumers. You should set this value to be longer than your expected job runtime to prevent other processes from picking up an running job. See the (SQS Visibility Timeout Documentation)

  • :shutdown_timeout (Integer)

    the amount of time to wait for a clean shutdown. Jobs that are unable to complete in this time will not be deleted from the SQS queue and will be retryable after the visibility timeout.

  • :poller_error_handler (Callable)

    and error handler to be called when the poller encounters an error running a job. Called with exception, sqs_message. You may re-raise the exception to terminate the poller. You may also choose whether to delete the sqs_message or not. If the message is not explicitly deleted then the message will be left on the queue and will be retried (pending the SQS Queue’s redrive/DLQ/maximum receive settings). Retries provided by this mechanism are after any retries configured on the job with ‘retry_on`.

  • :logger (ActiveSupport::Logger)

    Logger to use for the poller.

  • :config_file (String)

    Override file to load configuration from. If not specified will attempt to load from config/aws_active_job_sqs.yml.

  • :message_group_id (String) — default: SqsActiveJobGroup

    The message_group_id to use for queueing messages on a fifo queues. Applies only to jobs queued on FIFO queues. See the (SQS FIFO Documentation)

  • :async_queue_error_handler (Callable)

    An error handler to be called when the async active job adapter experiences an error queueing a job. Only applies when active_job.queue_adapter = :sqs_async. Called with:

    error, job, job_options
  • :client (SQS::Client)

    SQS Client to use. A default client will be created if none is provided.

  • :excluded_deduplication_keys (Array) — default: ['job_id']

    The type of keys stored in the array should be String or Symbol. Using this option, job_id is implicitly added to the keys.



152
153
154
155
156
157
158
# File 'lib/aws/active_job/sqs/configuration.rb', line 152

def initialize(options = {})
  opts = env_options.deep_merge(options)
  opts = file_options(opts).deep_merge(opts)
  opts = DEFAULTS.merge(logger: default_logger).deep_merge(opts)

  apply_attributes(opts)
end

Instance Attribute Details

#async_queue_error_handlerObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



161
162
163
# File 'lib/aws/active_job/sqs/configuration.rb', line 161

def async_queue_error_handler
  @async_queue_error_handler
end

#backpressureObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



161
162
163
# File 'lib/aws/active_job/sqs/configuration.rb', line 161

def backpressure
  @backpressure
end

#clientObject



178
179
180
181
182
183
184
# File 'lib/aws/active_job/sqs/configuration.rb', line 178

def client
  @client ||= begin
    client = Aws::SQS::Client.new
    client.config.user_agent_frameworks << 'aws-activejob-sqs'
    client
  end
end

#loggerObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



161
162
163
# File 'lib/aws/active_job/sqs/configuration.rb', line 161

def logger
  @logger
end

#max_messages=(value) ⇒ Object (writeonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



166
167
168
# File 'lib/aws/active_job/sqs/configuration.rb', line 166

def max_messages=(value)
  @max_messages = value
end

#message_group_id=(value) ⇒ Object (writeonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



166
167
168
# File 'lib/aws/active_job/sqs/configuration.rb', line 166

def message_group_id=(value)
  @message_group_id = value
end

#poller_error_handler(&block) ⇒ Object



173
174
175
176
# File 'lib/aws/active_job/sqs/configuration.rb', line 173

def poller_error_handler(&block)
  @poller_error_handler = block if block_given?
  @poller_error_handler
end

#queuesObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



161
162
163
# File 'lib/aws/active_job/sqs/configuration.rb', line 161

def queues
  @queues
end

#shutdown_timeoutObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



161
162
163
# File 'lib/aws/active_job/sqs/configuration.rb', line 161

def shutdown_timeout
  @shutdown_timeout
end

#threadsObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



161
162
163
# File 'lib/aws/active_job/sqs/configuration.rb', line 161

def threads
  @threads
end

#visibility_timeout=(value) ⇒ Object (writeonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



166
167
168
# File 'lib/aws/active_job/sqs/configuration.rb', line 166

def visibility_timeout=(value)
  @visibility_timeout = value
end

Instance Method Details

#excluded_deduplication_keys=(keys) ⇒ Object



169
170
171
# File 'lib/aws/active_job/sqs/configuration.rb', line 169

def excluded_deduplication_keys=(keys)
  @excluded_deduplication_keys = keys.map(&:to_s) | ['job_id']
end

#to_hObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



198
199
200
201
202
203
204
205
206
# File 'lib/aws/active_job/sqs/configuration.rb', line 198

def to_h
  h = {}
  instance_variables.each do |v|
    v_sym = v.to_s.delete('@').to_sym
    val = instance_variable_get(v)
    h[v_sym] = val
  end
  h
end

#to_sObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



193
194
195
# File 'lib/aws/active_job/sqs/configuration.rb', line 193

def to_s
  to_h.to_s
end