Class: MultiProcess::Group

Inherits:
Object
  • Object
show all
Defined in:
lib/multi_process/group.rb

Overview

Store and run a group of processes.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(receiver: nil, partition: nil) ⇒ Group

Create new process group.

Parameters:

  • opts (Hash)

    Options

  • otps (Hash)

    a customizable set of options



27
28
29
30
31
32
# File 'lib/multi_process/group.rb', line 27

def initialize(receiver: nil, partition: nil)
  @processes = []
  @receiver  = receiver || MultiProcess::Logger.global
  @partition = partition ? partition.to_i : 0
  @mutex     = Mutex.new
end

Instance Attribute Details

#partitionObject (readonly)

Partition size.



19
20
21
# File 'lib/multi_process/group.rb', line 19

def partition
  @partition
end

#processesObject (readonly)

Return list of processes.



10
11
12
# File 'lib/multi_process/group.rb', line 10

def processes
  @processes
end

#receiverObject

Receiver all processes in group should use.

If changed only affect new added processes.



16
17
18
# File 'lib/multi_process/group.rb', line 16

def receiver
  @receiver
end

Instance Method Details

#<<(procs) ⇒ Object

Add new process or list of processes.

If group was already started added processes will also be started.

Parameters:



40
41
42
43
44
45
46
47
# File 'lib/multi_process/group.rb', line 40

def <<(procs)
  Array(procs).flatten.each do |process|
    processes << process
    process.receiver = receiver

    process.start if started?
  end
end

#alive?Boolean

Check if group is alive e.g. if at least on process is alive.

Returns:

  • (Boolean)

    True if group is alive.



151
152
153
# File 'lib/multi_process/group.rb', line 151

def alive?
  processes.any?(&:alive?)
end

#available!(timeout: MultiProcess::DEFAULT_TIMEOUT) ⇒ Object

Wait until group is available. This implies waiting until all processes in group are available.

Processes will not be stopped if timeout occurs.

Parameters:

  • opts (Hash)

    Options.



171
172
173
174
175
# File 'lib/multi_process/group.rb', line 171

def available!(timeout: MultiProcess::DEFAULT_TIMEOUT)
  Timeout.timeout timeout do
    processes.each(&:available!)
  end
end

#available?Boolean

Check if group is available. The group is available if all processes are available.

Returns:

  • (Boolean)


158
159
160
# File 'lib/multi_process/group.rb', line 158

def available?
  processes.all?(:available?)
end

#run(delay: nil, timeout: nil) ⇒ Object

Start all process and wait for them to terminate.

Given options will be passed to #start and #wait. #start will only be called if partition is zero.

If timeout is given process will be terminated using #stop when timeout error is raised.



115
116
117
118
119
120
121
122
123
124
# File 'lib/multi_process/group.rb', line 115

def run(delay: nil, timeout: nil)
  if partition.positive?
    run_partition(&:run)
  else
    start(delay: delay)
    wait(timeout: timeout)
  end
ensure
  stop
end

#run!(delay: nil, timeout: nil) ⇒ Object

Start all process and wait for them to terminate.

Given options will be passed to #start and #wait. #start will only be called if partition is zero.

If timeout is given process will be terminated using #stop when timeout error is raised.

An error will be raised if any process exits unsuccessfully.



136
137
138
139
140
141
142
143
144
145
# File 'lib/multi_process/group.rb', line 136

def run!(delay: nil, timeout: nil)
  if partition.positive?
    run_partition(&:run!)
  else
    start(delay: delay)
    wait!(timeout: timeout)
  end
ensure
  stop
end

#start(delay: nil) ⇒ Object

Start all process in group.

Call blocks until all processes are started.

Parameters:

  • delay (Hash) (defaults to: nil)

    a customizable set of options

Options Hash (delay:):

  • Delay (Integer)

    in seconds between starting processes.



55
56
57
58
59
60
61
62
# File 'lib/multi_process/group.rb', line 55

def start(delay: nil)
  processes.each do |process|
    next if process.started?

    process.start
    sleep delay if delay
  end
end

#started?Boolean

Check if group was already started.

Returns:

  • (Boolean)

    True if group was already started.



68
69
70
# File 'lib/multi_process/group.rb', line 68

def started?
  processes.any?(&:started?)
end

#stopObject

Stop all processes.



74
75
76
# File 'lib/multi_process/group.rb', line 74

def stop
  processes.each(&:stop)
end

#wait(timeout: nil) ⇒ Object

Wait until all process terminated.

Parameters:

  • opts (Hash)

    Options.



84
85
86
87
88
89
90
# File 'lib/multi_process/group.rb', line 84

def wait(timeout: nil)
  if timeout
    ::Timeout.timeout(timeout) { wait }
  else
    processes.each(&:wait)
  end
end

#wait!(timeout: nil) ⇒ Object

Wait until all process terminated.

Raise an error if a process exists unsuccessfully.

Parameters:

  • opts (Hash)

    Options.



99
100
101
102
103
104
105
# File 'lib/multi_process/group.rb', line 99

def wait!(timeout: nil)
  if timeout
    ::Timeout.timeout(timeout) { wait! }
  else
    processes.each(&:wait!)
  end
end