Class: Sqrewdriver::Client::SendingBuffer
- Inherits:
-
Object
- Object
- Sqrewdriver::Client::SendingBuffer
- Includes:
- MonitorMixin
- Defined in:
- lib/sqrewdriver/client.rb
Defined Under Namespace
Classes: Chunk
Instance Attribute Summary collapse
-
#chunks ⇒ Object
readonly
Returns the value of attribute chunks.
Instance Method Summary collapse
- #add_aggregated_messages(messages) ⇒ Object
- #add_message(message) ⇒ Object
- #has_full_chunk? ⇒ Boolean
-
#initialize(client:, queue_url:, serializer:, thread_pool:) ⇒ SendingBuffer
constructor
A new instance of SendingBuffer.
- #send_first_chunk_async ⇒ Object
Constructor Details
#initialize(client:, queue_url:, serializer:, thread_pool:) ⇒ SendingBuffer
Returns a new instance of SendingBuffer.
68 69 70 71 72 73 74 75 |
# File 'lib/sqrewdriver/client.rb', line 68 def initialize(client:, queue_url:, serializer:, thread_pool:) super() @client = client @queue_url = queue_url @chunks = Concurrent::Array.new @serializer = serializer @thread_pool = thread_pool end |
Instance Attribute Details
#chunks ⇒ Object (readonly)
Returns the value of attribute chunks.
66 67 68 |
# File 'lib/sqrewdriver/client.rb', line 66 def chunks @chunks end |
Instance Method Details
#add_aggregated_messages(messages) ⇒ Object
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/sqrewdriver/client.rb', line 94 def () = [0] = .map { |m| m[:message_body] } serialized = @serializer.dump() [:message_body] = serialized add_size = (serialized, [:message_attributes]) synchronize do @chunks << Chunk.new if @chunks.empty? if @chunks.last.size == MAX_BATCH_SIZE || @chunks.last.bytesize + add_size > MAX_PAYLOAD_SIZE new_chunk = Chunk.new new_chunk.add(, add_size) @chunks << new_chunk else @chunks.last.add(, add_size) end end end |
#add_message(message) ⇒ Object
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/sqrewdriver/client.rb', line 77 def () serialized = @serializer.dump([:message_body]) [:message_body] = serialized add_size = (serialized, [:attributes]) synchronize do @chunks << Chunk.new if @chunks.empty? if @chunks.last.size == MAX_BATCH_SIZE || @chunks.last.bytesize + add_size > MAX_PAYLOAD_SIZE new_chunk = Chunk.new new_chunk.add(, add_size) @chunks << new_chunk else @chunks.last.add(, add_size) end end end |
#has_full_chunk? ⇒ Boolean
114 115 116 |
# File 'lib/sqrewdriver/client.rb', line 114 def has_full_chunk? @chunks.size > 1 end |
#send_first_chunk_async ⇒ Object
118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/sqrewdriver/client.rb', line 118 def send_first_chunk_async Concurrent::Promises.future_on(@thread_pool, @chunks) do |chunks| sending = synchronize { chunks.shift } if sending sending.data.each_with_index do |params, idx| params[:id] = idx.to_s params end (entries: sending.data) end end end |