Class: Sqrewdriver::Client::SendingBuffer

Inherits:
Object
  • Object
show all
Includes:
MonitorMixin
Defined in:
lib/sqrewdriver/client.rb

Defined Under Namespace

Classes: Chunk

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(client:, queue_url:, serializer:, thread_pool:) ⇒ SendingBuffer

Returns a new instance of SendingBuffer.



68
69
70
71
72
73
74
75
# File 'lib/sqrewdriver/client.rb', line 68

def initialize(client:, queue_url:, serializer:, thread_pool:)
  super()
  @client = client
  @queue_url = queue_url
  @chunks = Concurrent::Array.new
  @serializer = serializer
  @thread_pool = thread_pool
end

Instance Attribute Details

#chunksObject (readonly)

Returns the value of attribute chunks.



66
67
68
# File 'lib/sqrewdriver/client.rb', line 66

def chunks
  @chunks
end

Instance Method Details

#add_aggregated_messages(messages) ⇒ Object



94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/sqrewdriver/client.rb', line 94

def add_aggregated_messages(messages)
  base_message = messages[0]

  message_bodies = messages.map { |m| m[:message_body] }
  serialized = @serializer.dump(message_bodies)
  base_message[:message_body] = serialized
  add_size = calculate_message_size(serialized, base_message[:message_attributes])

  synchronize do
    @chunks << Chunk.new if @chunks.empty?
    if @chunks.last.size == MAX_BATCH_SIZE || @chunks.last.bytesize + add_size > MAX_PAYLOAD_SIZE
      new_chunk = Chunk.new
      new_chunk.add(base_message, add_size)
      @chunks << new_chunk
    else
      @chunks.last.add(base_message, add_size)
    end
  end
end

#add_message(message) ⇒ Object



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/sqrewdriver/client.rb', line 77

def add_message(message)
  serialized = @serializer.dump(message[:message_body])
  message[:message_body] = serialized
  add_size = calculate_message_size(serialized, message[:attributes])

  synchronize do
    @chunks << Chunk.new if @chunks.empty?
    if @chunks.last.size == MAX_BATCH_SIZE || @chunks.last.bytesize + add_size > MAX_PAYLOAD_SIZE
      new_chunk = Chunk.new
      new_chunk.add(message, add_size)
      @chunks << new_chunk
    else
      @chunks.last.add(message, add_size)
    end
  end
end

#has_full_chunk?Boolean

Returns:

  • (Boolean)


114
115
116
# File 'lib/sqrewdriver/client.rb', line 114

def has_full_chunk?
  @chunks.size > 1
end

#send_first_chunk_asyncObject



118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/sqrewdriver/client.rb', line 118

def send_first_chunk_async
  Concurrent::Promises.future_on(@thread_pool, @chunks) do |chunks|
    sending = synchronize { chunks.shift }
    if sending
      sending.data.each_with_index do |params, idx|
        params[:id] = idx.to_s
        params
      end
      send_message_batch_with_retry(entries: sending.data)
    end
  end
end