Class: Pheme::QueuePoller
- Inherits:
-
Object
- Object
- Pheme::QueuePoller
- Includes:
- Compression
- Defined in:
- lib/pheme/queue_poller.rb
Overview
rubocop:disable Metrics/ClassLength
Instance Attribute Summary collapse
-
#connection_pool_block ⇒ Object
Returns the value of attribute connection_pool_block.
-
#format ⇒ Object
Returns the value of attribute format.
-
#max_messages ⇒ Object
Returns the value of attribute max_messages.
-
#poller_configuration ⇒ Object
Returns the value of attribute poller_configuration.
-
#queue_poller ⇒ Object
Returns the value of attribute queue_poller.
-
#queue_url ⇒ Object
Returns the value of attribute queue_url.
Instance Method Summary collapse
- #get_content(body) ⇒ Object
- #get_metadata(message_body) ⇒ Object
- #handle(message, metadata, message_attributes) ⇒ Object
-
#initialize(queue_url:, connection_pool_block: false, max_messages: nil, format: :json, poller_configuration: {}, sqs_client: nil, idle_timeout: nil, message_handler: nil, &block_message_handler) ⇒ QueuePoller
constructor
A new instance of QueuePoller.
-
#parse_body(queue_message) ⇒ Object
returns queue_message.body as hash, stores and parses get_content to body.
- #parse_csv(message_contents) ⇒ Object
- #parse_json(message_contents) ⇒ Object
- #parse_message_attributes(queue_message) ⇒ Object
- #parse_metadata(queue_message) ⇒ Object
-
#poll ⇒ Object
rubocop:disable Metrics/AbcSize.
Methods included from Compression
Constructor Details
#initialize(queue_url:, connection_pool_block: false, max_messages: nil, format: :json, poller_configuration: {}, sqs_client: nil, idle_timeout: nil, message_handler: nil, &block_message_handler) ⇒ QueuePoller
Returns a new instance of QueuePoller.
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/pheme/queue_poller.rb', line 10 def initialize(queue_url:, connection_pool_block: false, max_messages: nil, format: :json, poller_configuration: {}, sqs_client: nil, idle_timeout: nil, message_handler: nil, &) raise ArgumentError, "must specify non-nil queue_url" if queue_url.blank? @queue_url = queue_url @queue_poller = Aws::SQS::QueuePoller.new(queue_url, client: sqs_client) @connection_pool_block = connection_pool_block @messages_processed = 0 @messages_received = 0 @format = format @max_messages = @poller_configuration = { wait_time_seconds: 10, # amount of time a long polling receive call can wait for a message before receiving a empty response (which will trigger another polling request) idle_timeout: 20, # disconnects poller after 20 seconds of idle time skip_delete: true, # manually delete messages }.merge(poller_configuration || {}) @poller_configuration[:idle_timeout] = idle_timeout unless idle_timeout.nil? if if .ancestors.include?(Pheme::MessageHandler) @message_handler = else raise ArgumentError, 'Invalid message handler, must inherit from Pheme::MessageHandler' end end @block_message_handler = raise ArgumentError, 'only provide a message_handler or a block, not both' if @message_handler && @block_message_handler if queue_poller.before_request do |stats| throw :stop_polling if stats. >= end end end |
Instance Attribute Details
#connection_pool_block ⇒ Object
Returns the value of attribute connection_pool_block.
8 9 10 |
# File 'lib/pheme/queue_poller.rb', line 8 def connection_pool_block @connection_pool_block end |
#format ⇒ Object
Returns the value of attribute format.
8 9 10 |
# File 'lib/pheme/queue_poller.rb', line 8 def format @format end |
#max_messages ⇒ Object
Returns the value of attribute max_messages.
8 9 10 |
# File 'lib/pheme/queue_poller.rb', line 8 def @max_messages end |
#poller_configuration ⇒ Object
Returns the value of attribute poller_configuration.
8 9 10 |
# File 'lib/pheme/queue_poller.rb', line 8 def poller_configuration @poller_configuration end |
#queue_poller ⇒ Object
Returns the value of attribute queue_poller.
8 9 10 |
# File 'lib/pheme/queue_poller.rb', line 8 def queue_poller @queue_poller end |
#queue_url ⇒ Object
Returns the value of attribute queue_url.
8 9 10 |
# File 'lib/pheme/queue_poller.rb', line 8 def queue_url @queue_url end |
Instance Method Details
#get_content(body) ⇒ Object
124 125 126 |
# File 'lib/pheme/queue_poller.rb', line 124 def get_content(body) decompress(body['Message']) end |
#get_metadata(message_body) ⇒ Object
120 121 122 |
# File 'lib/pheme/queue_poller.rb', line 120 def () .except('Message', 'Records') end |
#handle(message, metadata, message_attributes) ⇒ Object
138 139 140 141 142 143 144 145 146 |
# File 'lib/pheme/queue_poller.rb', line 138 def handle(, , ) if @message_handler @message_handler.new(message: , metadata: , message_attributes: ).handle elsif @block_message_handler @block_message_handler.call(, , ) else raise NotImplementedError end end |
#parse_body(queue_message) ⇒ Object
returns queue_message.body as hash, stores and parses get_content to body
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/pheme/queue_poller.rb', line 81 def parse_body() = JSON.parse(.body) raw_content = get_content() body = () case format when :csv parsed_content = parse_csv(raw_content) body['Message'] = raw_content when :json parsed_content = parse_json(raw_content) body['Message'] = parsed_content else method_name = :"parse_#{format}" raise ArgumentError, "Unknown format #{format}" unless respond_to?(method_name) parsed_content = __send__(method_name, raw_content) body['Records'] = parsed_content end (, body) parsed_content end |
#parse_csv(message_contents) ⇒ Object
128 129 130 131 |
# File 'lib/pheme/queue_poller.rb', line 128 def parse_csv() parsed_body = SmarterCSV.process(StringIO.new()) parsed_body.map { |item| ResourceStruct::FlexStruct.new(item) } end |
#parse_json(message_contents) ⇒ Object
133 134 135 136 |
# File 'lib/pheme/queue_poller.rb', line 133 def parse_json() parsed_body = JSON.parse() ResourceStruct::FlexStruct.new({ wrapper: parsed_body }).wrapper end |
#parse_message_attributes(queue_message) ⇒ Object
110 111 112 113 114 115 116 117 118 |
# File 'lib/pheme/queue_poller.rb', line 110 def () = JSON.parse(.body) = {} ['MessageAttributes']&.each do |key, value| [key.to_sym] = (value) end end |
#parse_metadata(queue_message) ⇒ Object
105 106 107 108 |
# File 'lib/pheme/queue_poller.rb', line 105 def () = JSON.parse(.body) { timestamp: ['Timestamp'], topic_arn: ['TopicArn'] } end |
#poll ⇒ Object
rubocop:disable Metrics/AbcSize
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/pheme/queue_poller.rb', line 56 def poll time_start = log_polling_start queue_poller.poll(poller_configuration) do || @messages_received += 1 Pheme.logger.tagged(.) do content = parse_body() = () = () with_optional_connection_pool_block { handle(content, , ) } queue_poller.() log_delete() @messages_processed += 1 rescue SignalException throw :stop_polling rescue StandardError => e Pheme.logger.error(e) Pheme.capture_exception(e, "#{self.class} failed to process message", { message: content }) end end log_polling_end(time_start) end |