Class: Bricolage::SQSDataSource::DeleteMessageBuffer
- Inherits:
-
Object
- Object
- Bricolage::SQSDataSource::DeleteMessageBuffer
- Defined in:
- lib/bricolage/sqsdatasource.rb
Defined Under Namespace
Classes: Entry
Constant Summary collapse
- BATCH_SIZE_MAX =
SQS system limit
10
- MAX_RETRY_COUNT =
3
Instance Method Summary collapse
- #empty? ⇒ Boolean
- #flush(now = Time.now) ⇒ Object
-
#flush_force ⇒ Object
Flushes all delayed delete requests, including pending requests.
- #full? ⇒ Boolean
-
#initialize(sqs_client, url, logger) ⇒ DeleteMessageBuffer
constructor
A new instance of DeleteMessageBuffer.
- #put(msg) ⇒ Object
- #size ⇒ Object
Constructor Details
#initialize(sqs_client, url, logger) ⇒ DeleteMessageBuffer
Returns a new instance of DeleteMessageBuffer.
214 215 216 217 218 219 |
# File 'lib/bricolage/sqsdatasource.rb', line 214 def initialize(sqs_client, url, logger) @sqs_client = sqs_client @url = url @logger = logger @buf = {} end |
Instance Method Details
#empty? ⇒ Boolean
227 228 229 |
# File 'lib/bricolage/sqsdatasource.rb', line 227 def empty? @buf.empty? end |
#flush(now = Time.now) ⇒ Object
245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 |
# File 'lib/bricolage/sqsdatasource.rb', line 245 def flush(now = Time.now) entries = @buf.values.select {|ent| ent.issuable?(now) } return if entries.empty? @logger.info "flushing async delete requests" entries.each_slice(BATCH_SIZE_MAX) do |ents| res = @sqs_client.(queue_url: @url, entries: ents.map(&:request_params)) @logger.info "DeleteMessageBatch executed: #{res.successful.size} succeeded, #{res.failed.size} failed" issued_time = Time.now res.successful.each do |s| @buf.delete s.id end res.failed.each do |f| ent = @buf[f.id] unless ent @logger.error "[BUG] no corrensponding DeleteMessageBuffer entry: id=#{f.id}" next end ent.failed!(issued_time) if ent.too_many_failure? @logger.warn "DeleteMessage failure count exceeded the limit; give up: message_id=#{ent..}, receipt_handle=#{ent..receipt_handle}" @buf.delete f.id next end @logger.info "DeleteMessageBatch partially failed (#{ent.n_failure} times): sender_fault=#{f.sender_fault}, code=#{f.code}, message=#{f.}" end end end |
#flush_force ⇒ Object
Flushes all delayed delete requests, including pending requests
240 241 242 243 |
# File 'lib/bricolage/sqsdatasource.rb', line 240 def flush_force # retry continues in only 2m, now+1h must be after than all @next_issue_time flush(Time.now + 3600) end |
#full? ⇒ Boolean
231 232 233 |
# File 'lib/bricolage/sqsdatasource.rb', line 231 def full? @buf.size >= BATCH_SIZE_MAX end |
#put(msg) ⇒ Object
221 222 223 224 225 |
# File 'lib/bricolage/sqsdatasource.rb', line 221 def put(msg) ent = Entry.new(msg) @buf[ent.id] = ent flush if full? end |
#size ⇒ Object
235 236 237 |
# File 'lib/bricolage/sqsdatasource.rb', line 235 def size @buf.size end |