Class: DRbQS::Worker::ProcessSet
- Inherits:
-
Object
- Object
- DRbQS::Worker::ProcessSet
- Defined in:
- lib/drbqs/worker/worker_process_set.rb
Constant Summary collapse
- WAITALL_INTERVAL_TIME =
0.1
Instance Attribute Summary collapse
-
#process ⇒ Object
readonly
Returns the value of attribute process.
Instance Method Summary collapse
- #all_processes ⇒ Object
-
#calculating?(key) ⇒ Boolean
Return true if the process of +key+ is calculating.
- #create_process(*keys) ⇒ Object
-
#exist?(key) ⇒ Boolean
Return true if the process of +key+ exists.
- #has_process? ⇒ Boolean
-
#initialize(process_class = nil) ⇒ ProcessSet
constructor
A new instance of ProcessSet.
- #kill_all_processes ⇒ Object
- #on_error(&block) ⇒ Object
- #on_result(&block) ⇒ Object
- #prepare_to_exit(key = nil) ⇒ Object
-
#respond_signal ⇒ Object
Read IOs and respond signals from chiled processes.
- #send_task(key, dumped_task_ary) ⇒ Object
- #waitall(interval_time = nil) ⇒ Object
-
#waiting?(key) ⇒ Boolean
Return true if the process +key+ does not calculate any tasks.
-
#waiting_processes ⇒ Object
Return keys of processes not calculating a task.
Constructor Details
#initialize(process_class = nil) ⇒ ProcessSet
Returns a new instance of ProcessSet.
11 12 13 14 15 16 17 |
# File 'lib/drbqs/worker/worker_process_set.rb', line 11 def initialize(process_class = nil) @process_class = process_class || DRbQS::Worker::SimpleForkedProcess @process = {} @result = Queue.new @on_error = nil @on_result = nil end |
Instance Attribute Details
#process ⇒ Object (readonly)
Returns the value of attribute process.
9 10 11 |
# File 'lib/drbqs/worker/worker_process_set.rb', line 9 def process @process end |
Instance Method Details
#all_processes ⇒ Object
88 89 90 |
# File 'lib/drbqs/worker/worker_process_set.rb', line 88 def all_processes @process.keys end |
#calculating?(key) ⇒ Boolean
Return true if the process of +key+ is calculating.
72 73 74 |
# File 'lib/drbqs/worker/worker_process_set.rb', line 72 def calculating?(key) @process[key] && !@process[key][:task].empty? end |
#create_process(*keys) ⇒ Object
55 56 57 58 59 |
# File 'lib/drbqs/worker/worker_process_set.rb', line 55 def create_process(*keys) keys.each do |key| get_process(key) end end |
#exist?(key) ⇒ Boolean
Return true if the process of +key+ exists.
63 64 65 |
# File 'lib/drbqs/worker/worker_process_set.rb', line 63 def exist?(key) @process[key] end |
#has_process? ⇒ Boolean
67 68 69 |
# File 'lib/drbqs/worker/worker_process_set.rb', line 67 def has_process? !@process.empty? end |
#kill_all_processes ⇒ Object
200 201 202 203 204 205 206 |
# File 'lib/drbqs/worker/worker_process_set.rb', line 200 def kill_all_processes @process.each do |key, h| Process.detach(h[:pid]) Process.kill("KILL", h[:pid]) end @process.clear end |
#on_error(&block) ⇒ Object
19 20 21 |
# File 'lib/drbqs/worker/worker_process_set.rb', line 19 def on_error(&block) @on_error = block end |
#on_result(&block) ⇒ Object
23 24 25 |
# File 'lib/drbqs/worker/worker_process_set.rb', line 23 def on_result(&block) @on_result = block end |
#prepare_to_exit(key = nil) ⇒ Object
119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/drbqs/worker/worker_process_set.rb', line 119 def prepare_to_exit(key = nil) if key if h = send_object(key, :prepare_to_exit) h[:exit] = true end else @process.each do |key, h| prepare_to_exit(key) end end end |
#respond_signal ⇒ Object
Read IOs and respond signals from chiled processes. If there is no data from child processes then the method returns false. Otherwise, true. Types of signals are :result, :node_error, :finish_preparing_to_exit.
- :result Execute callback set by DRbQS::Worker::ProcessSet#on_result.
- :node_error Execute callback set by DRbQS::Worker::ProcessSet#on_error.
- :finish_preparing_to_exit Send :exit signale to the process and delete from list of child processes.
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 |
# File 'lib/drbqs/worker/worker_process_set.rb', line 151 def respond_signal num = 0 to_be_deleted = [] @process.each do |key, h| if !h[:task].empty? || h[:exit] data = '' begin loop do data << h[:in].read_nonblock(READ_BYTE_SIZE) end rescue IO::WaitReadable rescue $stderr.puts "Stored data: " + data.inspect raise end if !data.empty? num += 1 h[:unpacker].feed_each(data) do |ary| response_type, response = ary case response_type when :result task_id, result = response h[:task].delete(task_id) if @on_result @on_result.call(key, [task_id, result]) else $stderr.puts "The instance of DRbQS::Worker::ProcessSet can not deal with results from child processes." end when :node_error if @on_error @on_error.call(key, response) else $stderr.puts "The instance of DRbQS::Worker::ProcessSet can not deal with error from child processes." end when :finish_preparing_to_exit delete_process(key) to_be_deleted << key end end end end end to_be_deleted.each do |key| @process.delete(key) end to_be_deleted.clear num > 0 end |
#send_task(key, dumped_task_ary) ⇒ Object
109 110 111 112 113 114 115 116 117 |
# File 'lib/drbqs/worker/worker_process_set.rb', line 109 def send_task(key, dumped_task_ary) if h = send_object(key, dumped_task_ary) if dumped_task_ary[0] h[:task] << dumped_task_ary[0] end else raise "Process #{key.inspect} does not exist." end end |
#waitall(interval_time = nil) ⇒ Object
210 211 212 213 214 215 216 217 218 219 220 221 222 223 |
# File 'lib/drbqs/worker/worker_process_set.rb', line 210 def waitall(interval_time = nil) unless @process.all? { |key, h| h[:exit] } return nil end t = interval_time || WAITALL_INTERVAL_TIME until @process.empty? respond_signal Kernel.sleep(t) end until Process.waitall == [] Kernel.sleep(t) end true end |
#waiting?(key) ⇒ Boolean
Return true if the process +key+ does not calculate any tasks.
77 78 79 |
# File 'lib/drbqs/worker/worker_process_set.rb', line 77 def waiting?(key) !calculating?(key) end |
#waiting_processes ⇒ Object
Return keys of processes not calculating a task.
82 83 84 85 86 |
# File 'lib/drbqs/worker/worker_process_set.rb', line 82 def waiting_processes @process.keys.select do |key| @process[key][:task].empty? end end |