Class: Puli

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/puli.rb

Defined Under Namespace

Classes: Task

Constant Summary collapse

VERSION =
'1.0.3'

Instance Method Summary collapse

Constructor Details

#initialize(num_threads: 3, tasks: []) ⇒ Puli

Returns a new instance of Puli.



10
11
12
13
14
15
# File 'lib/puli.rb', line 10

def initialize(num_threads: 3, tasks: [])
  @num_threads = num_threads.to_i
  @q = Queue.new
  @index = 0
  tasks.map{|t| self << t }
end

Instance Method Details

#<<(task_payload) ⇒ Object



17
18
19
20
# File 'lib/puli.rb', line 17

def <<(task_payload)
  @index += 1
  @q << Task.new(task_payload, @index)
end

#eachObject



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/puli.rb', line 32

def each
  last_captured_error = false
  in_execution_order = {}
  threads = (1..@num_threads.to_i).map do
    Thread.new do
      loop do
        break if last_captured_error
        begin
          task = @q.pop(non_block=true)
          yield(task.payload, index: task.index)
        rescue ThreadError
          break # Queue emptied
        rescue Exception => e
          last_captured_error = e
        end
      end
    end
  end
  threads.map(&:join)
  raise last_captured_error if last_captured_error
  self
end

#mapObject



22
23
24
25
26
27
28
29
30
# File 'lib/puli.rb', line 22

def map
  in_execution_order = []
  mux = Mutex.new
  each do |*payload, index:|
    task_result = yield(*payload)
    mux.synchronize { in_execution_order << Task.new(task_result, index) }
  end
  in_execution_order.sort_by(&:index).map(&:payload)
end