Class: Sqewer::LocalConnection
- Inherits:
-
Connection
- Object
- Connection
- Sqewer::LocalConnection
- Defined in:
- lib/sqewer/local_connection.rb
Constant Summary collapse
- ASSUME_DEADLETTER_AFTER_N_DELIVERIES =
10
Constants inherited from Connection
Connection::BATCH_RECEIVE_SIZE, Connection::DEFAULT_TIMEOUT_SECONDS, Connection::MAX_RANDOM_FAILURES_PER_CALL, Connection::MAX_RANDOM_RECEIVE_FAILURES, Connection::NotOurFaultAwsError
Class Method Summary collapse
Instance Method Summary collapse
-
#delete_multiple_messages {|#delete_message| ... } ⇒ void
Deletes multiple messages after they all have been succesfully decoded and processed.
-
#initialize(queue_url_with_sqlite3_scheme) ⇒ LocalConnection
constructor
A new instance of LocalConnection.
-
#receive_messages ⇒ Array<Message>
An array of Message objects.
- #send_multiple_messages {|#send_message| ... } ⇒ void
-
#truncate! ⇒ Object
Only gets used in tests.
Methods inherited from Connection
client, default, #delete_message, #send_message
Constructor Details
#initialize(queue_url_with_sqlite3_scheme) ⇒ LocalConnection
Returns a new instance of LocalConnection.
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/sqewer/local_connection.rb', line 20 def initialize(queue_url_with_sqlite3_scheme) require 'sqlite3' @db_path, @queue_name = self.class.parse_queue_url(queue_url_with_sqlite3_scheme) with_db do |db| db.execute("CREATE TABLE IF NOT EXISTS sqewer_messages_v3 ( id INTEGER PRIMARY KEY AUTOINCREMENT , queue_name VARCHAR NOT NULL, receipt_handle VARCHAR NOT NULL, deliver_after_epoch INTEGER, times_delivered_so_far INTEGER DEFAULT 0, last_delivery_at_epoch INTEGER, visible BOOLEAN DEFAULT 't', sent_timestamp_millis INTEGER, message_body TEXT)" ) db.execute("CREATE INDEX IF NOT EXISTS on_receipt_handle ON sqewer_messages_v3 (receipt_handle)") db.execute("CREATE INDEX IF NOT EXISTS on_queue_name ON sqewer_messages_v3 (queue_name)") end rescue LoadError => e raise e, "You need the sqlite3 gem in your Gemfile to use LocalConnection. Add it to your Gemfile (`gem 'sqlite3'')" end |
Class Method Details
.parse_queue_url(queue_url_starting_with_sqlite3_proto) ⇒ Object
5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
# File 'lib/sqewer/local_connection.rb', line 5 def self.parse_queue_url(queue_url_starting_with_sqlite3_proto) uri = URI.parse(queue_url_starting_with_sqlite3_proto) unless uri.scheme == 'sqlite3' raise "The scheme of the SQS queue URL should be with `sqlite3' but was %s" % uri.scheme end path_components = ['/', uri.hostname, uri.path].reject(&:nil?).reject(&:empty?).join('/').squeeze('/') dbfile_path = File.(path_components) queue_name = Rack::Utils.parse_nested_query(uri.query).fetch('queue', 'sqewer_local') [dbfile_path, queue_name] end |
Instance Method Details
#delete_multiple_messages {|#delete_message| ... } ⇒ void
This method returns an undefined value.
Deletes multiple messages after they all have been succesfully decoded and processed.
62 63 64 65 66 |
# File 'lib/sqewer/local_connection.rb', line 62 def buffer = DeleteBuffer.new yield(buffer) (buffer.) end |
#receive_messages ⇒ Array<Message>
Returns an array of Message objects.
43 44 45 46 47 |
# File 'lib/sqewer/local_connection.rb', line 43 def .map do |(receipt_handle, , )| Message.new(receipt_handle, , {'SentTimestamp' => }) end end |
#send_multiple_messages {|#send_message| ... } ⇒ void
This method returns an undefined value.
51 52 53 54 55 56 |
# File 'lib/sqewer/local_connection.rb', line 51 def buffer = SendBuffer.new yield(buffer) = buffer. () end |
#truncate! ⇒ Object
Only gets used in tests
69 70 71 72 73 |
# File 'lib/sqewer/local_connection.rb', line 69 def truncate! with_db do |db| db.execute("DELETE FROM sqewer_messages_v3 WHERE queue_name = ?", @queue_name) end end |