Class: Sqewer::ConnectionMessagebox
- Inherits:
-
Object
- Object
- Sqewer::ConnectionMessagebox
- Defined in:
- lib/sqewer/connection_messagebox.rb
Overview
A recorder for send_message and delete_message calls. Will buffer those calls as if it were a Connection, and then execute them within a synchronized mutex lock, to prevent concurrent submits to the Connection object, and, consequently, concurrent calls to the SQS client. We also buffer calls to the connection in the messagebox to implement simple batching of message submits and deletes. For example, imagine your job does this:
context.submit!(dependent_job)
context.submit!(another_dependent_job)
# ...100 lines further on
context.submit!(yet_another_job)
you would be doing 3 separate SQS requests and spending more money. Whereas a messagebox will be able to buffer those sends and pack them in batches, consequently performing less requests
Instance Method Summary collapse
-
#delete_message(message_identifier) ⇒ Object
Saves the given identifier to be deleted from the queue.
-
#flush! ⇒ Object
Flushes all the accumulated commands to the queue connection.
-
#initialize(connection) ⇒ ConnectionMessagebox
constructor
A new instance of ConnectionMessagebox.
-
#send_message(message_body, **kwargs_for_send) ⇒ Object
Saves the given body and the keyword arguments (such as delay_seconds) to be sent into the queue.
Constructor Details
#initialize(connection) ⇒ ConnectionMessagebox
Returns a new instance of ConnectionMessagebox.
20 21 22 23 24 25 |
# File 'lib/sqewer/connection_messagebox.rb', line 20 def initialize(connection) @connection = connection @deletes = [] @sends = [] @mux = Mutex.new end |
Instance Method Details
#delete_message(message_identifier) ⇒ Object
Saves the given identifier to be deleted from the queue. If there are more deletes in the same flush, they will be batched using batched deletes.
41 42 43 44 45 |
# File 'lib/sqewer/connection_messagebox.rb', line 41 def () @mux.synchronize { @deletes << } end |
#flush! ⇒ Object
Flushes all the accumulated commands to the queue connection. First the message sends are going to be flushed, then the message deletes. All of those will use batching where possible.
50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/sqewer/connection_messagebox.rb', line 50 def flush! @mux.synchronize do @connection. do | buffer | @sends.each { |body, kwargs| buffer.(body, **kwargs) } end @connection. do | buffer | @deletes.each { |id| buffer.(id) } end (@sends.length + @deletes.length).tap{ @sends.clear; @deletes.clear } end end |
#send_message(message_body, **kwargs_for_send) ⇒ Object
Saves the given body and the keyword arguments (such as delay_seconds) to be sent into the queue. If there are more sends in the same flush, they will be batched using batched deletes.G
31 32 33 34 35 |
# File 'lib/sqewer/connection_messagebox.rb', line 31 def (, **kwargs_for_send) @mux.synchronize { @sends << [, kwargs_for_send] } end |