Class: Sqrewdriver::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/sqrewdriver/client.rb

Defined Under Namespace

Classes: SendingBuffer

Constant Summary collapse

MAX_BATCH_SIZE =
10
MAX_PAYLOAD_SIZE =
256 * 1024

Instance Method Summary collapse

Constructor Details

#initialize(queue_url:, client: nil, threads: 32, serializer: Sqrewdriver.default_serializer, aggregate_messages_per: nil, flush_retry_count: 5, **options) ⇒ Client

Returns a new instance of Client.



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/sqrewdriver/client.rb', line 12

def initialize(queue_url:, client: nil, threads: 32, serializer: Sqrewdriver.default_serializer, aggregate_messages_per: nil, flush_retry_count: 5, **options)
  if client
    @client = client
  else
    @client = Aws::SQS::Client.new(options)
  end

  @queue_url = queue_url
  @message_buffer = Concurrent::Array.new
  @thread_pool = Concurrent::FixedThreadPool.new(threads)
  @flush_retry_count = flush_retry_count
  @waiting_futures = Concurrent::Set.new
  @flush_mutex = Mutex.new
  @aggregate_messages_per = aggregate_messages_per

  ensure_serializer_for_aggregation!(serializer)

  @sending_buffer = SendingBuffer.new(client: @client, queue_url: queue_url, serializer: serializer, thread_pool: @thread_pool)
end

Instance Method Details

#flush(timeout = nil) ⇒ Object



186
187
188
189
# File 'lib/sqrewdriver/client.rb', line 186

def flush(timeout = nil)
  flush_async
  wait_flushing(timeout)
end

#flush_asyncObject



158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/sqrewdriver/client.rb', line 158

def flush_async
  until @message_buffer.empty? do
    if @aggregate_messages_per
      messages = @message_buffer.shift(@aggregate_messages_per)
      @sending_buffer.add_aggregated_messages(messages)
    else
      message = @message_buffer.shift
      @sending_buffer.add_message(message)
    end

    if @sending_buffer.has_full_chunk?
      send_first_chunk_async
    end
  end

  send_first_chunk_async
end

#send_message_buffered(message) ⇒ Object

Add a message to buffer.

If count of buffered messages exceed 10 or aggregate_messages_per else if sum of message size exceeds 256KB, send payload to SQS asynchronously.



37
38
39
40
41
42
43
# File 'lib/sqrewdriver/client.rb', line 37

def send_message_buffered(message)
  add_message_to_buffer(message)

  if need_flush?
    flush_async
  end
end

#wait_flushing(timeout = nil) ⇒ Object



176
177
178
179
180
181
182
183
184
# File 'lib/sqrewdriver/client.rb', line 176

def wait_flushing(timeout = nil)
  zipped = Concurrent::Promises.zip_futures_on(@thread_pool, *@waiting_futures)
  unless zipped.wait(timeout)
    raise Sqrewdriver::SendMessageTimeout
  end

  exceptions = zipped.reason
  raise Sqrewdriver::SendMessageErrors.new(exceptions) if exceptions
end