Class: XPool

Inherits:
Object
  • Object
show all
Defined in:
lib/xpool.rb,
lib/xpool/version.rb

Defined Under Namespace

Classes: Process

Constant Summary collapse

VERSION =
"0.9.0.2"

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(size = number_of_cpu_cores) ⇒ XPool

Parameters:

  • size (Fixnum) (defaults to: number_of_cpu_cores)

    The number of subprocesses to spawn. Defaults to the number of cores on your CPU.



40
41
42
# File 'lib/xpool.rb', line 40

def initialize(size=number_of_cpu_cores)
  @pool = Array.new(size) { Process.new }
end

Class Method Details

.debugObject



9
10
11
12
13
14
15
16
17
18
19
20
# File 'lib/xpool.rb', line 9

def self.debug
  if block_given?
    begin
      @debug = true
      yield
    ensure
      @debug = false
    end
  else
    @debug
  end
end

.debug=(boolean) ⇒ Object



22
23
24
# File 'lib/xpool.rb', line 22

def self.debug=(boolean)
  @debug = boolean
end

.log(msg, type = :info) ⇒ Object



26
27
28
29
30
31
# File 'lib/xpool.rb', line 26

def self.log(msg, type = :info)
  @logger = @logger || Logger.new(STDOUT)
  if @debug
    @logger.public_send type, msg
  end
end

Instance Method Details

#broadcast(unit, *args) ⇒ Array<XPool::Process>

Broadcasts unit to be run across all subprocesses in the pool.

Examples:

pool = XPool.new 5
pool.broadcast unit
pool.shutdown

Returns:

  • (Array<XPool::Process>)

    Returns an array of XPool::Process objects

Raises:

  • (RuntimeError)

    When a subprocess in the pool is dead.



66
67
68
69
70
# File 'lib/xpool.rb', line 66

def broadcast(unit, *args)
  @pool.map do |process|
    process.schedule unit, *args
  end
end

#dry?Boolean

Returns true when all subprocesses in the pool are busy.

Returns:

  • (Boolean)

    Returns true when all subprocesses in the pool are busy.



161
162
163
# File 'lib/xpool.rb', line 161

def dry?
  @pool.all?(&:busy?)
end

#failed_processesArray<XPool::Process>

Returns an Array of failed processes.

Returns:



48
49
50
# File 'lib/xpool.rb', line 48

def failed_processes
  @pool.select(&:failed?)
end

#number_of_cpu_coresObject

Count the number of CPU cores available.



168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/xpool.rb', line 168

def number_of_cpu_cores
  case RbConfig::CONFIG['host_os']
  when /linux/
    Dir.glob('/sys/devices/system/cpu/cpu[0-9]*').count
  when /darwin|bsd/
    Integer(`sysctl -n hw.ncpu`)
  when /solaris/
    Integer(`kstat -m cpu_info | grep -w core_id | uniq | wc -l`)
  else
    5
  end
end

#resize!(range) ⇒ void

This method returns an undefined value.

Resize the pool. All subprocesses in the pool are abruptly stopped through #shutdown! and a new pool the size of range is created.

Examples:

pool = XPool.new 5
pool.resize! 1..3
pool.shutdown

Parameters:

  • range (Range)

    The new size of the pool.



121
122
123
124
# File 'lib/xpool.rb', line 121

def resize!(range)
  shutdown!
  @pool = range.to_a.map { Process.new }
end

#schedule(unit, *args) ⇒ XPool::Process

Dispatch a unit of work in a subprocess.

Parameters:

  • unit (#run)

    The unit of work

  • *args (Object)

    A variable number of arguments to be passed to #run

Returns:

Raises:

  • (RuntimeError)

    When the pool is dead (no subprocesses are left running)



138
139
140
141
142
143
144
145
# File 'lib/xpool.rb', line 138

def schedule(unit,*args)
  if size == 0 # dead pool
    raise RuntimeError,
      "cannot schedule unit of work on a dead pool"
  end
  process = @pool.reject(&:dead?).min_by { |p| p.frequency }
  process.schedule unit, *args
end

#shutdown(timeout = nil) ⇒ void

This method returns an undefined value.

A graceful shutdown of the pool. Each subprocess in the pool empties its queue and exits normally.

Parameters:

  • timeout (Fixnum) (defaults to: nil)

    An optional amount of seconds to wait before forcing a shutdown through #shutdown!.



82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/xpool.rb', line 82

def shutdown(timeout=nil)
  if timeout
    begin
      Timeout.timeout(timeout) do
        @pool.each(&:shutdown)
      end
    rescue Timeout::Error
      XPool.log "'#{timeout}' seconds elapsed, switching to hard shutdown."
      shutdown!
    end
  else
    @pool.each(&:shutdown)
  end
end

#shutdown!void

This method returns an undefined value.

A forceful shutdown of the pool (through SIGKILL).



102
103
104
# File 'lib/xpool.rb', line 102

def shutdown!
  @pool.each(&:shutdown!)
end

#sizeFixnum

Returns the number of alive subprocesses in the pool.

Returns:

  • (Fixnum)

    Returns the number of alive subprocesses in the pool.



151
152
153
154
155
# File 'lib/xpool.rb', line 151

def size
  @pool.count do |process|
    process.alive?
  end
end