Class: ConcurrentExecutor
- Inherits:
-
Object
- Object
- ConcurrentExecutor
- Defined in:
- lib/concurrent_executor.rb
Constant Summary collapse
- MAX_NUMBER_OF_THREADS =
100
Instance Attribute Summary collapse
-
#executor ⇒ Object
Returns the value of attribute executor.
-
#logger ⇒ Object
Returns the value of attribute logger.
-
#number_of_threads ⇒ Object
Returns the value of attribute number_of_threads.
-
#queue ⇒ Object
Returns the value of attribute queue.
-
#threads ⇒ Object
Returns the value of attribute threads.
-
#trace ⇒ Object
Returns the value of attribute trace.
Class Method Summary collapse
Instance Method Summary collapse
- #consume_enumerable(enum) ⇒ Object
- #graceful_shutdown ⇒ Object
-
#initialize(number_of_threads: 4, queue_size: 100, executor: nil, trace: true) ⇒ ConcurrentExecutor
constructor
A new instance of ConcurrentExecutor.
Constructor Details
#initialize(number_of_threads: 4, queue_size: 100, executor: nil, trace: true) ⇒ ConcurrentExecutor
Returns a new instance of ConcurrentExecutor.
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/concurrent_executor.rb', line 9 def initialize(number_of_threads: 4, queue_size: 100, executor: nil, trace: true) raise 'queue must be sized' unless queue_size raise 'number of threads must be > 0' unless (0..MAX_NUMBER_OF_THREADS).cover?(number_of_threads) @errors = Queue.new @errored = false self.logger = if defined?(Rails) Rails.logger elsif defined?(App) && App.respond_to?(:logger) App.logger else Logger.new(STDERR) end self.threads = [] self.executor = executor self.queue = SizedQueue.new(queue_size) self.number_of_threads = number_of_threads self.trace = trace start_threads end |
Instance Attribute Details
#executor ⇒ Object
Returns the value of attribute executor.
7 8 9 |
# File 'lib/concurrent_executor.rb', line 7 def executor @executor end |
#logger ⇒ Object
Returns the value of attribute logger.
7 8 9 |
# File 'lib/concurrent_executor.rb', line 7 def logger @logger end |
#number_of_threads ⇒ Object
Returns the value of attribute number_of_threads.
7 8 9 |
# File 'lib/concurrent_executor.rb', line 7 def number_of_threads @number_of_threads end |
#queue ⇒ Object
Returns the value of attribute queue.
7 8 9 |
# File 'lib/concurrent_executor.rb', line 7 def queue @queue end |
#threads ⇒ Object
Returns the value of attribute threads.
7 8 9 |
# File 'lib/concurrent_executor.rb', line 7 def threads @threads end |
#trace ⇒ Object
Returns the value of attribute trace.
7 8 9 |
# File 'lib/concurrent_executor.rb', line 7 def trace @trace end |
Class Method Details
.consume_enumerable(enum, **args, &blk) ⇒ Object
40 41 42 43 44 45 46 47 48 |
# File 'lib/concurrent_executor.rb', line 40 def consume_enumerable(enum, **args, &blk) executor = new(**args.merge(executor: blk)) executor.consume_enumerable(enum) rescue StandardError => e puts e raise e ensure executor&.graceful_shutdown end |
Instance Method Details
#consume_enumerable(enum) ⇒ Object
33 34 35 36 37 |
# File 'lib/concurrent_executor.rb', line 33 def consume_enumerable(enum) enum.each(&queue.method(:push)) rescue ClosedQueueError logger.warn 'Queue closed during iteration' end |
#graceful_shutdown ⇒ Object
51 52 53 54 55 |
# File 'lib/concurrent_executor.rb', line 51 def graceful_shutdown queue.close threads.map(&:join) raise @errors.pop(false) unless @errors.empty? end |