Class: ConcurrentExecutor

Inherits:
Object
  • Object
show all
Defined in:
lib/concurrent_executor.rb

Constant Summary collapse

MAX_NUMBER_OF_THREADS =
100

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(number_of_threads: 4, queue_size: 100, executor: nil, trace: true) ⇒ ConcurrentExecutor

Returns a new instance of ConcurrentExecutor.



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/concurrent_executor.rb', line 9

def initialize(number_of_threads: 4, queue_size: 100, executor: nil, trace: true)
  raise 'queue must be sized' unless queue_size
  raise 'number of threads must be > 0' unless (0..MAX_NUMBER_OF_THREADS).cover?(number_of_threads)

  @errors = Queue.new
  @errored = false

  self.logger = if defined?(Rails)
                  Rails.logger
                elsif defined?(App) && App.respond_to?(:logger)
                  App.logger
                else
                  Logger.new(STDERR)
                end

  self.threads = []
  self.executor = executor
  self.queue = SizedQueue.new(queue_size)
  self.number_of_threads = number_of_threads
  self.trace = trace

  start_threads
end

Instance Attribute Details

#executorObject

Returns the value of attribute executor.



7
8
9
# File 'lib/concurrent_executor.rb', line 7

def executor
  @executor
end

#loggerObject

Returns the value of attribute logger.



7
8
9
# File 'lib/concurrent_executor.rb', line 7

def logger
  @logger
end

#number_of_threadsObject

Returns the value of attribute number_of_threads.



7
8
9
# File 'lib/concurrent_executor.rb', line 7

def number_of_threads
  @number_of_threads
end

#queueObject

Returns the value of attribute queue.



7
8
9
# File 'lib/concurrent_executor.rb', line 7

def queue
  @queue
end

#threadsObject

Returns the value of attribute threads.



7
8
9
# File 'lib/concurrent_executor.rb', line 7

def threads
  @threads
end

#traceObject

Returns the value of attribute trace.



7
8
9
# File 'lib/concurrent_executor.rb', line 7

def trace
  @trace
end

Class Method Details

.consume_enumerable(enum, **args, &blk) ⇒ Object



40
41
42
43
44
45
46
47
48
# File 'lib/concurrent_executor.rb', line 40

def consume_enumerable(enum, **args, &blk)
  executor = new(**args.merge(executor: blk))
  executor.consume_enumerable(enum)
rescue StandardError => e
  puts e
  raise e
ensure
  executor&.graceful_shutdown
end

Instance Method Details

#consume_enumerable(enum) ⇒ Object



33
34
35
36
37
# File 'lib/concurrent_executor.rb', line 33

def consume_enumerable(enum)
  enum.each(&queue.method(:push))
rescue ClosedQueueError
  logger.warn 'Queue closed during iteration'
end

#graceful_shutdownObject



51
52
53
54
55
# File 'lib/concurrent_executor.rb', line 51

def graceful_shutdown
  queue.close
  threads.map(&:join)
  raise @errors.pop(false) unless @errors.empty?
end