Class: Sqrewdriver::Client
- Inherits:
-
Object
- Object
- Sqrewdriver::Client
- Defined in:
- lib/sqrewdriver/client.rb
Defined Under Namespace
Classes: SendingBuffer
Constant Summary collapse
- MAX_BATCH_SIZE =
10
- MAX_PAYLOAD_SIZE =
256 * 1024
Instance Method Summary collapse
- #flush(timeout = nil) ⇒ Object
- #flush_async ⇒ Object
-
#initialize(queue_url:, client: nil, threads: 32, serializer: Sqrewdriver.default_serializer, aggregate_messages_per: nil, flush_retry_count: 5, **options) ⇒ Client
constructor
A new instance of Client.
-
#send_message_buffered(message) ⇒ Object
Add a message to buffer.
- #wait_flushing(timeout = nil) ⇒ Object
Constructor Details
#initialize(queue_url:, client: nil, threads: 32, serializer: Sqrewdriver.default_serializer, aggregate_messages_per: nil, flush_retry_count: 5, **options) ⇒ Client
Returns a new instance of Client.
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/sqrewdriver/client.rb', line 12 def initialize(queue_url:, client: nil, threads: 32, serializer: Sqrewdriver.default_serializer, aggregate_messages_per: nil, flush_retry_count: 5, **) if client @client = client else @client = Aws::SQS::Client.new() end @queue_url = queue_url @message_buffer = Concurrent::Array.new @thread_pool = Concurrent::FixedThreadPool.new(threads) @flush_retry_count = flush_retry_count @waiting_futures = Concurrent::Set.new @flush_mutex = Mutex.new @aggregate_messages_per = ensure_serializer_for_aggregation!(serializer) @sending_buffer = SendingBuffer.new(client: @client, queue_url: queue_url, serializer: serializer, thread_pool: @thread_pool) end |
Instance Method Details
#flush(timeout = nil) ⇒ Object
186 187 188 189 |
# File 'lib/sqrewdriver/client.rb', line 186 def flush(timeout = nil) flush_async wait_flushing(timeout) end |
#flush_async ⇒ Object
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
# File 'lib/sqrewdriver/client.rb', line 158 def flush_async until @message_buffer.empty? do if @aggregate_messages_per = @message_buffer.shift(@aggregate_messages_per) @sending_buffer.() else = @message_buffer.shift @sending_buffer.() end if @sending_buffer.has_full_chunk? send_first_chunk_async end end send_first_chunk_async end |
#send_message_buffered(message) ⇒ Object
Add a message to buffer.
If count of buffered messages exceed 10 or aggregate_messages_per else if sum of message size exceeds 256KB, send payload to SQS asynchronously.
37 38 39 40 41 42 43 |
# File 'lib/sqrewdriver/client.rb', line 37 def () () if need_flush? flush_async end end |
#wait_flushing(timeout = nil) ⇒ Object
176 177 178 179 180 181 182 183 184 |
# File 'lib/sqrewdriver/client.rb', line 176 def wait_flushing(timeout = nil) zipped = Concurrent::Promises.zip_futures_on(@thread_pool, *@waiting_futures) unless zipped.wait(timeout) raise Sqrewdriver::SendMessageTimeout end exceptions = zipped.reason raise Sqrewdriver::SendMessageErrors.new(exceptions) if exceptions end |