Class: Process::Group
- Inherits:
-
Object
- Object
- Process::Group
- Defined in:
- lib/process/group.rb,
lib/process/group/version.rb
Overview
A group of tasks which can be run asynchrnously using fibers. Someone must call Group#wait to ensure that all fibers eventually resume.
Defined Under Namespace
Constant Summary collapse
- VERSION =
"1.2.4"
Instance Attribute Summary collapse
-
#limit ⇒ Object
The maximum number of processes to run concurrently, or zero.
-
#running ⇒ Object
readonly
A table of currently running processes.
Class Method Summary collapse
Instance Method Summary collapse
- #async ⇒ Object
-
#available? ⇒ Boolean
Whether or not #spawn, #fork or #run can be scheduled immediately.
-
#blocking? ⇒ Boolean
Whether or not calling #spawn, #fork or #run would block the caller fiber (i.e. call Fiber.yield).
-
#fork(**options, &block) ⇒ Object
Fork a block as a child process.
-
#id ⇒ Object
The id of the process group, only valid if processes are currently running.
-
#initialize(limit: nil, terminal: Terminal::Device.new?) ⇒ Group
constructor
Create a new process group.
-
#kill(signal = :INT) ⇒ Object
Send a signal to all currently running processes.
- #queued? ⇒ Boolean
-
#run(*arguments, **options) ⇒ Object
Run a process in a new fiber, arguments have same meaning as Process#spawn.
-
#running? ⇒ Boolean
Are there processes currently running?.
-
#spawn(*arguments, **options) ⇒ Object
Run a specific command as a child process.
- #to_s ⇒ Object
-
#wait ⇒ Object
Wait for all running and queued processes to finish.
Constructor Details
#initialize(limit: nil, terminal: Terminal::Device.new?) ⇒ Group
Create a new process group. Can specify ‘limit:` which limits the maximum number of concurrent processes.
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/process/group.rb', line 106 def initialize(limit: nil, terminal: Terminal::Device.new?) raise ArgumentError.new("Limit must be nil (unlimited) or > 0") unless limit == nil or limit > 0 @pid = Process.pid @terminal = terminal @queue = [] @limit = limit @running = {} @fiber = nil @pgid = nil # Whether we can actively schedule tasks or not: @waiting = false end |
Instance Attribute Details
#limit ⇒ Object
The maximum number of processes to run concurrently, or zero
129 130 131 |
# File 'lib/process/group.rb', line 129 def limit @limit end |
#running ⇒ Object (readonly)
A table of currently running processes.
126 127 128 |
# File 'lib/process/group.rb', line 126 def running @running end |
Class Method Details
Instance Method Details
#async ⇒ Object
156 157 158 159 160 |
# File 'lib/process/group.rb', line 156 def async Fiber.new do yield self end.resume end |
#available? ⇒ Boolean
Whether or not #spawn, #fork or #run can be scheduled immediately.
173 174 175 176 177 178 179 |
# File 'lib/process/group.rb', line 173 def available? if @limit @running.size < @limit else true end end |
#blocking? ⇒ Boolean
Whether or not calling #spawn, #fork or #run would block the caller fiber (i.e. call Fiber.yield).
182 183 184 |
# File 'lib/process/group.rb', line 182 def blocking? not available? end |
#fork(**options, &block) ⇒ Object
Fork a block as a child process.
168 169 170 |
# File 'lib/process/group.rb', line 168 def fork(**, &block) append! Fork.new(block, **) end |
#id ⇒ Object
The id of the process group, only valid if processes are currently running.
132 133 134 135 136 |
# File 'lib/process/group.rb', line 132 def id raise RuntimeError.new("No processes in group, no group id available.") if @running.size == 0 -@pgid end |
#kill(signal = :INT) ⇒ Object
Send a signal to all currently running processes. No-op unless #running?
227 228 229 230 231 |
# File 'lib/process/group.rb', line 227 def kill(signal = :INT) if running? Process.kill(signal, id) end end |
#queued? ⇒ Boolean
138 139 140 |
# File 'lib/process/group.rb', line 138 def queued? @queue.size > 0 end |
#run(*arguments, **options) ⇒ Object
Run a process in a new fiber, arguments have same meaning as Process#spawn.
148 149 150 151 152 153 154 |
# File 'lib/process/group.rb', line 148 def run(*arguments, **) Fiber.new do exit_status = self.spawn(*arguments, **) yield exit_status if block_given? end.resume end |
#running? ⇒ Boolean
Are there processes currently running?
143 144 145 |
# File 'lib/process/group.rb', line 143 def running? @running.size > 0 end |
#spawn(*arguments, **options) ⇒ Object
Run a specific command as a child process.
163 164 165 |
# File 'lib/process/group.rb', line 163 def spawn(*arguments, **) append! Spawn.new(arguments, **) end |
#to_s ⇒ Object
233 234 235 |
# File 'lib/process/group.rb', line 233 def to_s "#<#{self.class} running=#{@running.size} queued=#{@queue.size} limit=#{@limit} pgid=#{@pgid}>" end |
#wait ⇒ Object
Wait for all running and queued processes to finish. If you provide a block, it will be invoked before waiting, but within canonical signal handling machinery.
187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 |
# File 'lib/process/group.rb', line 187 def wait raise ArgumentError.new("Cannot call Process::Group#wait from child process!") unless @pid == Process.pid waiting do yield(self) if block_given? while running? process, status = wait_one schedule! process.resume(status) end end # No processes, process group is no longer valid: @pgid = nil return self rescue Interrupt # If the user interrupts the wait, interrupt the process group and wait for them to finish: self.kill(:INT) # If user presses Ctrl-C again (or something else goes wrong), we will come out and kill(:TERM) in the ensure below: wait_all raise ensure # You'd only get here with running processes if some unexpected error was thrown in user code: begin self.kill(:TERM) rescue Errno::EPERM # Sometimes, `kill` code can give EPERM, if any signal couldn't be delivered to a child. This might occur if an exception is thrown in the user code (e.g. within the fiber), and there are other zombie processes which haven't been reaped yet. These should be dealt with below, so it shouldn't be an issue to ignore this condition. end # Clean up zombie processes - if user presses Ctrl-C or for some reason something else blows up, exception would propagate back to caller: wait_all end |