Class: MultiProcess::Group
- Inherits:
-
Object
- Object
- MultiProcess::Group
- Defined in:
- lib/multi_process/group.rb
Overview
Store and run a group of processes.
Instance Attribute Summary collapse
-
#partition ⇒ Object
readonly
Partition size.
-
#processes ⇒ Object
readonly
Return list of processes.
-
#receiver ⇒ Object
Receiver all processes in group should use.
Instance Method Summary collapse
-
#<<(procs) ⇒ Object
Add new process or list of processes.
-
#alive? ⇒ Boolean
Check if group is alive e.g.
-
#available!(timeout: MultiProcess::DEFAULT_TIMEOUT) ⇒ Object
Wait until group is available.
-
#available? ⇒ Boolean
Check if group is available.
-
#initialize(receiver: nil, partition: nil) ⇒ Group
constructor
Create new process group.
-
#run(delay: nil, timeout: nil) ⇒ Object
Start all process and wait for them to terminate.
-
#run!(delay: nil, timeout: nil) ⇒ Object
Start all process and wait for them to terminate.
-
#start(delay: nil) ⇒ Object
Start all process in group.
-
#started? ⇒ Boolean
Check if group was already started.
-
#stop ⇒ Object
Stop all processes.
-
#wait(timeout: nil) ⇒ Object
Wait until all process terminated.
-
#wait!(timeout: nil) ⇒ Object
Wait until all process terminated.
Constructor Details
#initialize(receiver: nil, partition: nil) ⇒ Group
Create new process group.
27 28 29 30 31 32 |
# File 'lib/multi_process/group.rb', line 27 def initialize(receiver: nil, partition: nil) @processes = [] @receiver = receiver || MultiProcess::Logger.global @partition = partition ? partition.to_i : 0 @mutex = Mutex.new end |
Instance Attribute Details
#partition ⇒ Object (readonly)
Partition size.
19 20 21 |
# File 'lib/multi_process/group.rb', line 19 def partition @partition end |
#processes ⇒ Object (readonly)
Return list of processes.
10 11 12 |
# File 'lib/multi_process/group.rb', line 10 def processes @processes end |
#receiver ⇒ Object
Receiver all processes in group should use.
If changed only affect new added processes.
16 17 18 |
# File 'lib/multi_process/group.rb', line 16 def receiver @receiver end |
Instance Method Details
#<<(procs) ⇒ Object
Add new process or list of processes.
If group was already started added processes will also be started.
40 41 42 43 44 45 46 47 |
# File 'lib/multi_process/group.rb', line 40 def <<(procs) Array(procs).flatten.each do |process| processes << process process.receiver = receiver process.start if started? end end |
#alive? ⇒ Boolean
Check if group is alive e.g. if at least on process is alive.
151 152 153 |
# File 'lib/multi_process/group.rb', line 151 def alive? processes.any?(&:alive?) end |
#available!(timeout: MultiProcess::DEFAULT_TIMEOUT) ⇒ Object
Wait until group is available. This implies waiting until all processes in group are available.
Processes will not be stopped if timeout occurs.
171 172 173 174 175 |
# File 'lib/multi_process/group.rb', line 171 def available!(timeout: MultiProcess::DEFAULT_TIMEOUT) Timeout.timeout timeout do processes.each(&:available!) end end |
#available? ⇒ Boolean
Check if group is available. The group is available if all processes are available.
158 159 160 |
# File 'lib/multi_process/group.rb', line 158 def available? processes.all?(:available?) end |
#run(delay: nil, timeout: nil) ⇒ Object
115 116 117 118 119 120 121 122 123 124 |
# File 'lib/multi_process/group.rb', line 115 def run(delay: nil, timeout: nil) if partition.positive? run_partition(&:run) else start(delay: delay) wait(timeout: timeout) end ensure stop end |
#run!(delay: nil, timeout: nil) ⇒ Object
136 137 138 139 140 141 142 143 144 145 |
# File 'lib/multi_process/group.rb', line 136 def run!(delay: nil, timeout: nil) if partition.positive? run_partition(&:run!) else start(delay: delay) wait!(timeout: timeout) end ensure stop end |
#start(delay: nil) ⇒ Object
Start all process in group.
Call blocks until all processes are started.
55 56 57 58 59 60 61 62 |
# File 'lib/multi_process/group.rb', line 55 def start(delay: nil) processes.each do |process| next if process.started? process.start sleep delay if delay end end |
#started? ⇒ Boolean
Check if group was already started.
68 69 70 |
# File 'lib/multi_process/group.rb', line 68 def started? processes.any?(&:started?) end |
#stop ⇒ Object
Stop all processes.
74 75 76 |
# File 'lib/multi_process/group.rb', line 74 def stop processes.each(&:stop) end |
#wait(timeout: nil) ⇒ Object
Wait until all process terminated.
84 85 86 87 88 89 90 |
# File 'lib/multi_process/group.rb', line 84 def wait(timeout: nil) if timeout ::Timeout.timeout(timeout) { wait } else processes.each(&:wait) end end |
#wait!(timeout: nil) ⇒ Object
Wait until all process terminated.
Raise an error if a process exists unsuccessfully.
99 100 101 102 103 104 105 |
# File 'lib/multi_process/group.rb', line 99 def wait!(timeout: nil) if timeout ::Timeout.timeout(timeout) { wait! } else processes.each(&:wait!) end end |