Class: DerivativeRodeo::StorageLocations::SqsLocation

Inherits:
BaseLocation
  • Object
show all
Defined in:
lib/derivative_rodeo/storage_locations/sqs_location.rb

Overview

Location to download and upload files to Sqs It uploads a file_uri to the queue, not the contents of that file reading from the queue is not currently implemented

rubocop:disable Metrics/ClassLength

Class Attributes collapse

Attributes inherited from BaseLocation

#file_uri

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from BaseLocation

build, #derived_file_from, #file_basename, #file_extension, file_path_from_parts, from_uri, inherited, #initialize, load_location, location_name, locations, #matching_locations_in_file_dir, register_location, #tmp_file_dir, #with_existing_tmp_path, #with_new_extension, #with_new_tmp_path, #with_tmp_path

Constructor Details

This class inherits a constructor from DerivativeRodeo::StorageLocations::BaseLocation

Instance Attribute Details

#batch_sizeInteger

Returns:

  • (Integer)


24
# File 'lib/derivative_rodeo/storage_locations/sqs_location.rb', line 24

class_attribute :batch_size, default: 10

#use_real_sqsObject

When true, use the real SQS; else when false use a fake one. You probably don’t want to use the fake one in your production. But it’s exposed in this manner to ease testing of downstream dependencies.



30
# File 'lib/derivative_rodeo/storage_locations/sqs_location.rb', line 30

class_attribute :use_real_sqs, default: true

Class Method Details

.adapter_prefix(config: DerivativeRodeo.config) ⇒ Object



48
49
50
# File 'lib/derivative_rodeo/storage_locations/sqs_location.rb', line 48

def self.adapter_prefix(config: DerivativeRodeo.config)
  "#{scheme}://#{config.aws_sqs_region}.amazonaws.com/#{config.}/#{config.aws_sqs_queue}/"
end

.create_uri(path:, parts: 1) ⇒ String

Create a new uri of the classes type. Parts argument should have a default in implementing classes. Must support a number or the symbol :all

Parameters:

  • path (String)
  • parts (Integer, :all) (defaults to: 1)

    , defaults to 1 for Sqs which is file_name.ext. We use 1 because it helps with tmp files, but the actual sqs queue does not have a file path

Returns:

  • (String)


43
44
45
46
# File 'lib/derivative_rodeo/storage_locations/sqs_location.rb', line 43

def self.create_uri(path:, parts: 1)
  file_path = file_path_from_parts(path: path, parts: parts)
  "#{adapter_prefix}#{file_path}"
end

Instance Method Details

#add(message:) ⇒ Object

rubocop:enable Metrics/MethodLength



122
123
124
125
126
127
# File 'lib/derivative_rodeo/storage_locations/sqs_location.rb', line 122

def add(message:)
  client.send_message({
                        queue_url: queue_url,
                        message_body: message
                      })
end

#add_batch(messages:) ⇒ Object



129
130
131
132
133
134
# File 'lib/derivative_rodeo/storage_locations/sqs_location.rb', line 129

def add_batch(messages:)
  client.send_message_batch({
                              queue_url: queue_url,
                              entries: messages
                            })
end

#clientObject

rubocop:disable Metrics/MethodLength



102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/derivative_rodeo/storage_locations/sqs_location.rb', line 102

def client
  @client ||= if use_real_sqs?
                if config.aws_sqs_access_key_id && config.aws_sqs_secret_access_key
                  Aws::SQS::Client.new(
                    region: config.aws_sqs_region,
                    credentials: Aws::Credentials.new(
                      config.aws_sqs_access_key_id,
                      config.aws_sqs_secret_access_key
                    )
                  )
                else
                  Aws::SQS::Client.new(region: config.aws_sqs_region)
                end
              else
                # We are not requiring this file; except in the spec context.
                AwsSqsFauxClient.new
              end
end

#exist?Boolean

Existance is futile. And there’s not way to check if a specific item is in an sqs queue

Returns:

  • (Boolean)


75
76
77
# File 'lib/derivative_rodeo/storage_locations/sqs_location.rb', line 75

def exist?
  false
end

#file_dirObject



166
167
168
# File 'lib/derivative_rodeo/storage_locations/sqs_location.rb', line 166

def file_dir
  @file_dir ||= file_uri_parts[:file_dir]
end

#file_nameObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



172
173
174
# File 'lib/derivative_rodeo/storage_locations/sqs_location.rb', line 172

def file_name
  @file_name ||= file_uri_parts[:file_name]
end

#file_pathObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



162
163
164
# File 'lib/derivative_rodeo/storage_locations/sqs_location.rb', line 162

def file_path
  @file_path ||= [file_dir, file_name].join('/')
end

#file_uri_partsObject



194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
# File 'lib/derivative_rodeo/storage_locations/sqs_location.rb', line 194

def file_uri_parts
  return @file_uri_parts if @file_uri_parts
  uri = URI.parse(file_uri)
  @file_uri_parts = uri&.component&.inject({}) do |hash, component|
    hash[component] = uri.send(component)
    hash
  end
  @file_uri_parts[:region] = @file_uri_parts[:host]&.split('.')&.[](0)
  path_parts = @file_uri_parts[:path]&.split('/')
  @file_uri_parts[:account_id] = path_parts&.[](1)
  @file_uri_parts[:queue_name] = path_parts&.[](2)
  @file_uri_parts[:file_name] = path_parts&.[](-1)
  @file_uri_parts[:file_dir] = path_parts&.[](3..-2)&.join('/')
  @file_uri_parts
end

#output_json(uri) ⇒ Object



188
189
190
191
192
# File 'lib/derivative_rodeo/storage_locations/sqs_location.rb', line 188

def output_json(uri)
  # TODO: Add ability to handle a pre-process-template given to an SQS, and pass that along to the generator when applicable.
  key = DerivativeRodeo::Services::ConvertUriViaTemplateService.call(from_uri: uri, template: template, adapter: self)
  { key => [template] }.to_json
end

#paramsObject



184
185
186
# File 'lib/derivative_rodeo/storage_locations/sqs_location.rb', line 184

def params
  @params ||= CGI.parse(file_uri_parts[:query]) if file_uri_parts[:query]
end

#queue_nameObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



154
155
156
157
158
# File 'lib/derivative_rodeo/storage_locations/sqs_location.rb', line 154

def queue_name
  @queue_name ||= file_uri_parts[:queue_name]
rescue StandardError
  raise Errors::QueueMissingError
end

#queue_urlObject

response = client.receive_message({

                                    queue_url: queue_url,
                                    max_number_of_messages: batch_size
                                  })
response.messages.map do |message|
  JSON.parse(message.body)
end

end



148
149
150
# File 'lib/derivative_rodeo/storage_locations/sqs_location.rb', line 148

def queue_url
  @queue_url ||= client.get_queue_url(queue_name: queue_name).queue_url
end

#schemeObject



180
181
182
# File 'lib/derivative_rodeo/storage_locations/sqs_location.rb', line 180

def scheme
  file_uri_parts&.[](:scheme)
end

#templateObject



176
177
178
# File 'lib/derivative_rodeo/storage_locations/sqs_location.rb', line 176

def template
  params&.[]('template')&.first
end

#writeString

write the tmp file to the file_uri

Returns:

  • (String)

    the file_uri



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/derivative_rodeo/storage_locations/sqs_location.rb', line 84

def write
  raise Errors::FileMissingError("Use write within a with__new_tmp_path block and fille the mp file with data before writing") unless File.exist?(tmp_file_path)
  raise Errors::MaxQqueueSize(batch_size: batch_size) if batch_size > config.aws_sqs_max_batch_size
  batch = []
  Dir.glob("#{File.dirname(tmp_file_path)}/**/**").each.with_index do |fp, i|
    batch << { id: SecureRandom.uuid, message_body: output_json("file://#{fp}") }
    if (i + 1 % batch_size).zero?
      add_batch(messages: batch)
      batch = []
    end
  end

  # Ensure we're flushing the batched up queue as part of completing the write.
  add_batch(messages: batch) if batch.present?
  file_uri
end