Class: Pheme::QueuePoller

Inherits:
Object
  • Object
show all
Includes:
Compression
Defined in:
lib/pheme/queue_poller.rb

Overview

rubocop:disable Metrics/ClassLength

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Compression

#compress, #decompress

Constructor Details

#initialize(queue_url:, connection_pool_block: false, max_messages: nil, format: :json, poller_configuration: {}, sqs_client: nil, idle_timeout: nil, message_handler: nil, &block_message_handler) ⇒ QueuePoller

Returns a new instance of QueuePoller.

Raises:

  • (ArgumentError)


10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/pheme/queue_poller.rb', line 10

def initialize(queue_url:,
  connection_pool_block: false,
  max_messages: nil,
  format: :json,
  poller_configuration: {},
  sqs_client: nil,
  idle_timeout: nil,
  message_handler: nil,
  &block_message_handler)
  raise ArgumentError, "must specify non-nil queue_url" if queue_url.blank?

  @queue_url = queue_url
  @queue_poller = Aws::SQS::QueuePoller.new(queue_url, client: sqs_client)
  @connection_pool_block = connection_pool_block
  @messages_processed = 0
  @messages_received = 0
  @format = format
  @max_messages = max_messages
  @poller_configuration = {
    wait_time_seconds: 10, # amount of time a long polling receive call can wait for a message before receiving a empty response (which will trigger another polling request)
    idle_timeout: 20, # disconnects poller after 20 seconds of idle time
    skip_delete: true, # manually delete messages
  }.merge(poller_configuration || {})

  @poller_configuration[:idle_timeout] = idle_timeout unless idle_timeout.nil?

  if message_handler
    if message_handler.ancestors.include?(Pheme::MessageHandler)
      @message_handler = message_handler
    else
      raise ArgumentError, 'Invalid message handler, must inherit from Pheme::MessageHandler'
    end
  end

  @block_message_handler = block_message_handler

  raise ArgumentError, 'only provide a message_handler or a block, not both' if @message_handler && @block_message_handler

  if max_messages
    queue_poller.before_request do |stats|
      throw :stop_polling if stats.received_message_count >= max_messages
    end
  end
end

Instance Attribute Details

#connection_pool_blockObject

Returns the value of attribute connection_pool_block.



8
9
10
# File 'lib/pheme/queue_poller.rb', line 8

def connection_pool_block
  @connection_pool_block
end

#formatObject

Returns the value of attribute format.



8
9
10
# File 'lib/pheme/queue_poller.rb', line 8

def format
  @format
end

#max_messagesObject

Returns the value of attribute max_messages.



8
9
10
# File 'lib/pheme/queue_poller.rb', line 8

def max_messages
  @max_messages
end

#poller_configurationObject

Returns the value of attribute poller_configuration.



8
9
10
# File 'lib/pheme/queue_poller.rb', line 8

def poller_configuration
  @poller_configuration
end

#queue_pollerObject

Returns the value of attribute queue_poller.



8
9
10
# File 'lib/pheme/queue_poller.rb', line 8

def queue_poller
  @queue_poller
end

#queue_urlObject

Returns the value of attribute queue_url.



8
9
10
# File 'lib/pheme/queue_poller.rb', line 8

def queue_url
  @queue_url
end

Instance Method Details

#get_content(body) ⇒ Object



124
125
126
# File 'lib/pheme/queue_poller.rb', line 124

def get_content(body)
  decompress(body['Message'])
end

#get_metadata(message_body) ⇒ Object



120
121
122
# File 'lib/pheme/queue_poller.rb', line 120

def (message_body)
  message_body.except('Message', 'Records')
end

#handle(message, metadata, message_attributes) ⇒ Object



138
139
140
141
142
143
144
145
146
# File 'lib/pheme/queue_poller.rb', line 138

def handle(message, , message_attributes)
  if @message_handler
    @message_handler.new(message: message, metadata: , message_attributes: message_attributes).handle
  elsif @block_message_handler
    @block_message_handler.call(message, , message_attributes)
  else
    raise NotImplementedError
  end
end

#parse_body(queue_message) ⇒ Object

returns queue_message.body as hash, stores and parses get_content to body



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/pheme/queue_poller.rb', line 81

def parse_body(queue_message)
  message_body = JSON.parse(queue_message.body)
  raw_content = get_content(message_body)
  body = (message_body)

  case format
  when :csv
    parsed_content = parse_csv(raw_content)
    body['Message'] = raw_content
  when :json
    parsed_content = parse_json(raw_content)
    body['Message'] = parsed_content
  else
    method_name = :"parse_#{format}"
    raise ArgumentError, "Unknown format #{format}" unless respond_to?(method_name)

    parsed_content = __send__(method_name, raw_content)
    body['Records'] = parsed_content
  end

  log_message_received(queue_message, body)
  parsed_content
end

#parse_csv(message_contents) ⇒ Object



128
129
130
131
# File 'lib/pheme/queue_poller.rb', line 128

def parse_csv(message_contents)
  parsed_body = SmarterCSV.process(StringIO.new(message_contents))
  parsed_body.map { |item| ResourceStruct::FlexStruct.new(item) }
end

#parse_json(message_contents) ⇒ Object



133
134
135
136
# File 'lib/pheme/queue_poller.rb', line 133

def parse_json(message_contents)
  parsed_body = JSON.parse(message_contents)
  ResourceStruct::FlexStruct.new({ wrapper: parsed_body }).wrapper
end

#parse_message_attributes(queue_message) ⇒ Object



110
111
112
113
114
115
116
117
118
# File 'lib/pheme/queue_poller.rb', line 110

def parse_message_attributes(queue_message)
  message_body = JSON.parse(queue_message.body)
  message_attributes = {}
  message_body['MessageAttributes']&.each do |key, value|
    message_attributes[key.to_sym] = coerce_message_attribute(value)
  end

  message_attributes
end

#parse_metadata(queue_message) ⇒ Object



105
106
107
108
# File 'lib/pheme/queue_poller.rb', line 105

def (queue_message)
  message_body = JSON.parse(queue_message.body)
  { timestamp: message_body['Timestamp'], topic_arn: message_body['TopicArn'] }
end

#pollObject

rubocop:disable Metrics/AbcSize



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/pheme/queue_poller.rb', line 56

def poll
  time_start = log_polling_start
  queue_poller.poll(poller_configuration) do |queue_message|
    @messages_received += 1
    Pheme.logger.tagged(queue_message.message_id) do
      content = parse_body(queue_message)
       = (queue_message)
      message_attributes = parse_message_attributes(queue_message)
      with_optional_connection_pool_block { handle(content, , message_attributes) }
      queue_poller.delete_message(queue_message)
      log_delete(queue_message)
      @messages_processed += 1
    rescue SignalException
      throw :stop_polling
    rescue StandardError => e
      Pheme.logger.error(e)
      Pheme.capture_exception(e, "#{self.class} failed to process message", { message: content })
    end
  end
  log_polling_end(time_start)
end