22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
|
# File 'lib/threadify.rb', line 22
def threadify(*args, &block)
opts = args.last.is_a?(Hash) ? args.pop : {}
opts.keys.each{|key| opts[key.to_s.to_sym] = opts.delete(key)}
opts[:threads] ||= (Numeric === args.first ? args.shift : Threadify.threads)
opts[:strategy] ||= (args.empty? ? Threadify.strategy : args)
threads = Integer(opts[:threads])
strategy = opts[:strategy]
done = Object.new.freeze
nothing = done
jobs = Array.new(threads).map{ [] }
top = Thread.current
i = 0
send(*strategy){|*args| jobs[i % threads].push([args, i]); i += 1}
threads.times{|i| jobs[i].push(done)}
consumers = Array.new threads
thrownv = Hash.new
thrownq = Queue.new
caught = false
catcher = Thread.new do
loop do
thrown = thrownq.pop
break if thrown == done
i, thrown = thrown
thrownv[i] = thrown
caught = true
end
end
threads.times do |i|
consumers[i] = Thread.new(jobs[i]) do |jobsi|
this = Thread.current
this.abort_on_exception = Threadify.abort_on_exception
job = nil
thrown =
catch(:threadify) do
loop{
break if caught
job = jobsi.shift
break if job == done
args = job.first
jobsi << (job << block.call(*args))
}
nothing
end
unless nothing == thrown
thrownq.push [i, thrown]
args, i = job
end
end
end
consumers.map{|t| t.join}
thrownq.push done
catcher.join
unless thrownv.empty?
key = thrownv.keys.sort.first
return thrownv[key]
end
ret = []
jobs.each do |results|
results.each do |result|
break if result == done
elem, i, value = result
ret[i] = value
end
end
ret
end
|