Module: Parallel

Defined in:
lib/parallel.rb,
lib/parallel/version.rb

Defined Under Namespace

Classes: DeadWorker, ExceptionWrapper, Worker

Constant Summary collapse

VERSION =
Version = '0.6.2'

Class Method Summary collapse

Class Method Details

.each(array, options = {}, &block) ⇒ Object



77
78
79
80
# File 'lib/parallel.rb', line 77

def each(array, options={}, &block)
  map(array, options.merge(:preserve_results => false), &block)
  array
end

.each_with_index(array, options = {}, &block) ⇒ Object



82
83
84
# File 'lib/parallel.rb', line 82

def each_with_index(array, options={}, &block)
  each(array, options.merge(:with_index => true), &block)
end

.in_processes(options = {}, &block) ⇒ Object



71
72
73
74
75
# File 'lib/parallel.rb', line 71

def in_processes(options = {}, &block)
  count, options = extract_count_from_options(options)
  count ||= processor_count
  map(0...count, options.merge(:in_processes => count), &block)
end

.in_threads(options = {:count => 2}) ⇒ Object



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/parallel.rb', line 54

def in_threads(options={:count => 2})
  count, options = extract_count_from_options(options)

  out = []
  threads = []

  count.times do |i|
    threads[i] = Thread.new do
      out[i] = yield(i)
    end
  end

  wait_for_threads(threads)

  out
end

.map(array, options = {}, &block) ⇒ Object



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/parallel.rb', line 86

def map(array, options = {}, &block)
  array = array.to_a # turn Range and other Enumerable-s into an Array

  if options[:in_threads]
    method = :in_threads
    size = options[method]
  else
    method = :in_processes
    size = options[method] || processor_count
  end
  size = [array.size, size].min

  return work_direct(array, options, &block) if size == 0

  if method == :in_threads
    work_in_threads(array, options.merge(:count => size), &block)
  else
    work_in_processes(array, options.merge(:count => size), &block)
  end
end

.map_with_index(array, options = {}, &block) ⇒ Object



107
108
109
# File 'lib/parallel.rb', line 107

def map_with_index(array, options={}, &block)
  map(array, options.merge(:with_index => true), &block)
end

.physical_processor_countObject



134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/parallel.rb', line 134

def physical_processor_count
  @physical_processor_count ||= case RbConfig::CONFIG['host_os']
  when /darwin1/, /freebsd/
    `sysctl -n hw.physicalcpu`.to_i
  when /linux/
    `grep cores /proc/cpuinfo`[/\d+/].to_i
  when /mswin|mingw/
    require 'win32ole'
    wmi = WIN32OLE.connect("winmgmts://")
    cpu = wmi.ExecQuery("select NumberOfProcessors from Win32_Processor")
    cpu.to_enum.first.NumberOfLogicalProcessors
  else
    processor_count
  end
end

.processor_countObject



111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/parallel.rb', line 111

def processor_count
  @processor_count ||= case RbConfig::CONFIG['host_os']
  when /darwin9/
    `hwprefs cpu_count`.to_i
  when /darwin/
    (hwprefs_available? ? `hwprefs thread_count` : `sysctl -n hw.ncpu`).to_i
  when /linux|cygwin/
    `grep -c processor /proc/cpuinfo`.to_i
  when /(open|free)bsd/
    `sysctl -n hw.ncpu`.to_i
  when /mswin|mingw/
    require 'win32ole'
    wmi = WIN32OLE.connect("winmgmts://")
    cpu = wmi.ExecQuery("select NumberOfLogicalProcessors from Win32_Processor")
    cpu.to_enum.first.NumberOfLogicalProcessors
  when /solaris2/
    `psrinfo -p`.to_i # this is physical cpus afaik
  else
    $stderr.puts "Unknown architecture ( #{RbConfig::CONFIG["host_os"]} ) assuming one processor."
    1
  end
end