Class: BucketProcessor

Inherits:
Object
  • Object
show all
Defined in:
lib/s3-object-processor.rb

Instance Method Summary collapse

Constructor Details

#initialize(key_id, key_secret, bucket, options = {}, &callback) ⇒ BucketProcessor

Returns a new instance of BucketProcessor.



204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
# File 'lib/s3-object-processor.rb', line 204

def initialize(key_id, key_secret, bucket, options = {}, &callback)
	protocol = options[:no_https] ? 'http' : 'https'
	port = options[:no_https] ? 80 : 443
	@log = options[:log] || Logger.new(STDERR)
	workers = options[:workers] || 10
	lister_fetch_size = options[:lister_fetch_size] || 200
	lister_backlog = options[:lister_backlog] || 1000
	max_keys = options[:max_keys]
 	reporter_backlog = options[:reporter_backlog] || 1000
	reporter_summary_interval = options[:reporter_summary_interval] || 100
	reporter_average_contribution = options[:reporter_average_contribution] || 0.10
	custom_reports = options[:reports] || []
	@key_list = options[:key_list]

	s3 = RightAws::S3.new(key_id, key_secret, multi_thread: true, logger: @log, protocol: protocol, port: port)
	bucket = s3.bucket(bucket)

	@key_queue = SizedQueue.new(lister_backlog)

	@reporter = Reporter.new(reporter_backlog) do |reports|
		total_listed_keys = 0
		total_processed_keys = 0
		total_succeeded_keys = 0
		total_failed_keys = 0
		total_handled_keys = 0
		total_skipped_keys = 0
		total_nooped_keys = 0

		processed_avg = 0.0
		last_time = nil
		last_total = 0

		reports.each do |key, value|
			case key
			when :new_keys_count
				total_listed_keys += value
			when :processed_key
				total_processed_keys += 1
				if total_processed_keys % reporter_summary_interval == 0
					if last_time
						contribution = reporter_average_contribution
						new = (total_processed_keys - last_total).to_f / (Time.now.to_f - last_time)
						processed_avg = processed_avg * (1.0 - contribution) + new * contribution
					end
					last_time = Time.now.to_f
					last_total = total_processed_keys

					log_line = "-[%s]- processed %6d: failed: %6d (%6.2f %%) handled: %6d skipped: %6d (%6.2f %%)" % [
						value.to_s[0...2].ljust(2),
						total_processed_keys,
						total_failed_keys,
						total_failed_keys.to_f / total_processed_keys * 100,
						total_handled_keys,
						total_skipped_keys,
						total_skipped_keys.to_f / total_processed_keys * 100
					]
					log_line << custom_reports.each_value.map{|v| ' ' + v.to_s}.join
					log_line << " [backlog: %4d] @ %.1f op/s" % [
						@key_queue.size,
						processed_avg
					]

					@log.info log_line
				end
			when :succeeded_key
				total_succeeded_keys += 1
			when :failed_key
				key, error = *value
				@log.error "Key processing failed: #{key}: #{error.class.name}, #{error.message}"
				total_failed_keys += 1
			when :handled_key
				total_handled_keys += 1
			when :skipped_key
				total_skipped_keys += 1
			when :noop_key
				total_nooped_keys += 1
			else
				#@log.debug "custom report event: #{key}: #{value}"
				custom_reports[key].update(value)
			end
			#@log.debug("Report: #{key}: #{value}")
		end

		reports.on_finish do
			@log.info("total listed keys:                      #{total_listed_keys}")
			@log.info("total processed keys:                   #{total_processed_keys}")
			@log.info("total succeeded keys:                   #{total_succeeded_keys}")
			@log.info("total failed keys:                      #{total_failed_keys}")
			@log.info("total handled keys:                     #{total_handled_keys}")
			@log.info("total skipped keys:                     #{total_skipped_keys}")
			@log.info("total nooped keys:                      #{total_nooped_keys}")
			custom_reports.each_value do |report|
				@log.info report.final
			end
		end
	end

	# create lister
	@lister = if @key_list
		@log.info "processing #{@key_list.length} keys from list file"
		@lister = ListLister.new(bucket, @key_queue, max_keys)
	else
		@lister = Lister.new(bucket, @key_queue, lister_fetch_size, max_keys)
	end
	.on_keys_chunk do |keys_chunk|
		@log.debug "Got #{keys_chunk.length} new keys"
		@reporter.report(:new_keys_count, keys_chunk.length)
	end
	.on_finish do
		@log.debug "Done listing keys"
		# notify all workers that no more messages will be posted
		workers.times{ @key_queue << :end }
	end

	# create workers
	@log.info "Launching #{workers} workers"
	@workers = (1..workers).to_a.map do |worker_no|
		Worker.new(worker_no, @key_queue) do |key|
			@log.debug "Worker[#{worker_no}]: Processing key #{key}"
			yield bucket, key, @reporter
			@reporter.report :processed_key, key
			@reporter.report :succeeded_key, key
		end
		.on_error do |key, error|
			@reporter.report :processed_key, key
			@reporter.report :failed_key, [key, error]
		end
		.on_finish do
			@log.debug "Worker #{worker_no} done"
		end
	end
end

Instance Method Details

#run(prefix = nil) ⇒ Object



337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
# File 'lib/s3-object-processor.rb', line 337

def run(prefix = nil)
	begin
		@reporter.run
		if @key_list
			@lister.run(@key_list)
		else
			@lister.run(prefix)
		end
		@workers.each(&:run)

		# wait for all to finish
		@workers.each(&:join)
		@log.info "All workers done"

		@lister.join
		@reporter.join
	rescue Interrupt => error
		@log.warn 'Interrupted'
		# flush thread waiting on queues
		@key_queue.max = 999999
		@reporter.join
	end
end