Class: DerivativeRodeo::StorageLocations::SqsLocation
- Inherits:
-
BaseLocation
- Object
- BaseLocation
- DerivativeRodeo::StorageLocations::SqsLocation
- Defined in:
- lib/derivative_rodeo/storage_locations/sqs_location.rb
Overview
Location to download and upload files to Sqs It uploads a file_uri to the queue, not the contents of that file reading from the queue is not currently implemented
rubocop:disable Metrics/ClassLength
Class Attributes collapse
- #batch_size ⇒ Integer
-
#use_real_sqs ⇒ Object
When true, use the real SQS; else when false use a fake one.
Attributes inherited from BaseLocation
Class Method Summary collapse
- .adapter_prefix(config: DerivativeRodeo.config) ⇒ Object
-
.create_uri(path:, parts: 1) ⇒ String
Create a new uri of the classes type.
Instance Method Summary collapse
-
#add(message:) ⇒ Object
rubocop:enable Metrics/MethodLength.
- #add_batch(messages:) ⇒ Object
-
#client ⇒ Object
rubocop:disable Metrics/MethodLength.
-
#exist? ⇒ Boolean
Existance is futile.
- #file_dir ⇒ Object
- #file_name ⇒ Object private
- #file_path ⇒ Object private
- #file_uri_parts ⇒ Object
- #output_json(uri) ⇒ Object
- #params ⇒ Object
- #queue_name ⇒ Object private
-
#queue_url ⇒ Object
response = client.receive_message({ queue_url: queue_url, max_number_of_messages: batch_size }) response.messages.map do |message| JSON.parse(message.body) end end.
- #scheme ⇒ Object
- #template ⇒ Object
-
#write ⇒ String
write the tmp file to the file_uri.
Methods inherited from BaseLocation
build, #derived_file_from, #file_basename, #file_extension, file_path_from_parts, from_uri, inherited, #initialize, load_location, location_name, locations, #matching_locations_in_file_dir, register_location, #tmp_file_dir, #with_existing_tmp_path, #with_new_extension, #with_new_tmp_path, #with_tmp_path
Constructor Details
This class inherits a constructor from DerivativeRodeo::StorageLocations::BaseLocation
Instance Attribute Details
#batch_size ⇒ Integer
24 |
# File 'lib/derivative_rodeo/storage_locations/sqs_location.rb', line 24 class_attribute :batch_size, default: 10 |
#use_real_sqs ⇒ Object
When true, use the real SQS; else when false use a fake one. You probably don’t want to use the fake one in your production. But it’s exposed in this manner to ease testing of downstream dependencies.
30 |
# File 'lib/derivative_rodeo/storage_locations/sqs_location.rb', line 30 class_attribute :use_real_sqs, default: true |
Class Method Details
.adapter_prefix(config: DerivativeRodeo.config) ⇒ Object
48 49 50 |
# File 'lib/derivative_rodeo/storage_locations/sqs_location.rb', line 48 def self.adapter_prefix(config: DerivativeRodeo.config) "#{scheme}://#{config.aws_sqs_region}.amazonaws.com/#{config.aws_sqs_account_id}/#{config.aws_sqs_queue}/" end |
.create_uri(path:, parts: 1) ⇒ String
Create a new uri of the classes type. Parts argument should have a default in implementing classes. Must support a number or the symbol :all
43 44 45 46 |
# File 'lib/derivative_rodeo/storage_locations/sqs_location.rb', line 43 def self.create_uri(path:, parts: 1) file_path = file_path_from_parts(path: path, parts: parts) "#{adapter_prefix}#{file_path}" end |
Instance Method Details
#add(message:) ⇒ Object
rubocop:enable Metrics/MethodLength
122 123 124 125 126 127 |
# File 'lib/derivative_rodeo/storage_locations/sqs_location.rb', line 122 def add(message:) client.({ queue_url: queue_url, message_body: }) end |
#add_batch(messages:) ⇒ Object
129 130 131 132 133 134 |
# File 'lib/derivative_rodeo/storage_locations/sqs_location.rb', line 129 def add_batch(messages:) client.({ queue_url: queue_url, entries: }) end |
#client ⇒ Object
rubocop:disable Metrics/MethodLength
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/derivative_rodeo/storage_locations/sqs_location.rb', line 102 def client @client ||= if use_real_sqs? if config.aws_sqs_access_key_id && config.aws_sqs_secret_access_key Aws::SQS::Client.new( region: config.aws_sqs_region, credentials: Aws::Credentials.new( config.aws_sqs_access_key_id, config.aws_sqs_secret_access_key ) ) else Aws::SQS::Client.new(region: config.aws_sqs_region) end else # We are not requiring this file; except in the spec context. AwsSqsFauxClient.new end end |
#exist? ⇒ Boolean
Existance is futile. And there’s not way to check if a specific item is in an sqs queue
75 76 77 |
# File 'lib/derivative_rodeo/storage_locations/sqs_location.rb', line 75 def exist? false end |
#file_dir ⇒ Object
166 167 168 |
# File 'lib/derivative_rodeo/storage_locations/sqs_location.rb', line 166 def file_dir @file_dir ||= file_uri_parts[:file_dir] end |
#file_name ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
172 173 174 |
# File 'lib/derivative_rodeo/storage_locations/sqs_location.rb', line 172 def file_name @file_name ||= file_uri_parts[:file_name] end |
#file_path ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
162 163 164 |
# File 'lib/derivative_rodeo/storage_locations/sqs_location.rb', line 162 def file_path @file_path ||= [file_dir, file_name].join('/') end |
#file_uri_parts ⇒ Object
194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 |
# File 'lib/derivative_rodeo/storage_locations/sqs_location.rb', line 194 def file_uri_parts return @file_uri_parts if @file_uri_parts uri = URI.parse(file_uri) @file_uri_parts = uri&.component&.inject({}) do |hash, component| hash[component] = uri.send(component) hash end @file_uri_parts[:region] = @file_uri_parts[:host]&.split('.')&.[](0) path_parts = @file_uri_parts[:path]&.split('/') @file_uri_parts[:account_id] = path_parts&.[](1) @file_uri_parts[:queue_name] = path_parts&.[](2) @file_uri_parts[:file_name] = path_parts&.[](-1) @file_uri_parts[:file_dir] = path_parts&.[](3..-2)&.join('/') @file_uri_parts end |
#output_json(uri) ⇒ Object
188 189 190 191 192 |
# File 'lib/derivative_rodeo/storage_locations/sqs_location.rb', line 188 def output_json(uri) # TODO: Add ability to handle a pre-process-template given to an SQS, and pass that along to the generator when applicable. key = DerivativeRodeo::Services::ConvertUriViaTemplateService.call(from_uri: uri, template: template, adapter: self) { key => [template] }.to_json end |
#params ⇒ Object
184 185 186 |
# File 'lib/derivative_rodeo/storage_locations/sqs_location.rb', line 184 def params @params ||= CGI.parse(file_uri_parts[:query]) if file_uri_parts[:query] end |
#queue_name ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
154 155 156 157 158 |
# File 'lib/derivative_rodeo/storage_locations/sqs_location.rb', line 154 def queue_name @queue_name ||= file_uri_parts[:queue_name] rescue StandardError raise Errors::QueueMissingError end |
#queue_url ⇒ Object
response = client.receive_message({
queue_url: queue_url,
max_number_of_messages: batch_size
})
response.messages.map do |message|
JSON.parse(message.body)
end
end
148 149 150 |
# File 'lib/derivative_rodeo/storage_locations/sqs_location.rb', line 148 def queue_url @queue_url ||= client.get_queue_url(queue_name: queue_name).queue_url end |
#scheme ⇒ Object
180 181 182 |
# File 'lib/derivative_rodeo/storage_locations/sqs_location.rb', line 180 def scheme file_uri_parts&.[](:scheme) end |
#template ⇒ Object
176 177 178 |
# File 'lib/derivative_rodeo/storage_locations/sqs_location.rb', line 176 def template params&.[]('template')&.first end |
#write ⇒ String
write the tmp file to the file_uri
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/derivative_rodeo/storage_locations/sqs_location.rb', line 84 def write raise Errors::FileMissingError("Use write within a with__new_tmp_path block and fille the mp file with data before writing") unless File.exist?(tmp_file_path) raise Errors::MaxQqueueSize(batch_size: batch_size) if batch_size > config.aws_sqs_max_batch_size batch = [] Dir.glob("#{File.dirname(tmp_file_path)}/**/**").each.with_index do |fp, i| batch << { id: SecureRandom.uuid, message_body: output_json("file://#{fp}") } if (i + 1 % batch_size).zero? add_batch(messages: batch) batch = [] end end # Ensure we're flushing the batched up queue as part of completing the write. add_batch(messages: batch) if batch.present? file_uri end |