Class: DRbQS::Worker::ProcessSet

Inherits:
Object
  • Object
show all
Defined in:
lib/drbqs/worker/worker_process_set.rb

Constant Summary collapse

WAITALL_INTERVAL_TIME =
0.1

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(process_class = nil) ⇒ ProcessSet

Returns a new instance of ProcessSet.



11
12
13
14
15
16
17
# File 'lib/drbqs/worker/worker_process_set.rb', line 11

def initialize(process_class = nil)
  @process_class = process_class || DRbQS::Worker::SimpleForkedProcess
  @process = {}
  @result = Queue.new
  @on_error = nil
  @on_result = nil
end

Instance Attribute Details

#processObject (readonly)

Returns the value of attribute process.



9
10
11
# File 'lib/drbqs/worker/worker_process_set.rb', line 9

def process
  @process
end

Instance Method Details

#all_processesObject



88
89
90
# File 'lib/drbqs/worker/worker_process_set.rb', line 88

def all_processes
  @process.keys
end

#calculating?(key) ⇒ Boolean

Return true if the process of +key+ is calculating.

Returns:

  • (Boolean)


72
73
74
# File 'lib/drbqs/worker/worker_process_set.rb', line 72

def calculating?(key)
  @process[key] && !@process[key][:task].empty?
end

#create_process(*keys) ⇒ Object



55
56
57
58
59
# File 'lib/drbqs/worker/worker_process_set.rb', line 55

def create_process(*keys)
  keys.each do |key|
    get_process(key)
  end
end

#exist?(key) ⇒ Boolean

Return true if the process of +key+ exists.

Parameters:

  • key (Symbol, nil)

    Key of child process or nil.

Returns:

  • (Boolean)


63
64
65
# File 'lib/drbqs/worker/worker_process_set.rb', line 63

def exist?(key)
  @process[key]
end

#has_process?Boolean

Returns:

  • (Boolean)


67
68
69
# File 'lib/drbqs/worker/worker_process_set.rb', line 67

def has_process?
  !@process.empty?
end

#kill_all_processesObject



200
201
202
203
204
205
206
# File 'lib/drbqs/worker/worker_process_set.rb', line 200

def kill_all_processes
  @process.each do |key, h|
    Process.detach(h[:pid])
    Process.kill("KILL", h[:pid])
  end
  @process.clear
end

#on_error(&block) ⇒ Object



19
20
21
# File 'lib/drbqs/worker/worker_process_set.rb', line 19

def on_error(&block)
  @on_error = block
end

#on_result(&block) ⇒ Object



23
24
25
# File 'lib/drbqs/worker/worker_process_set.rb', line 23

def on_result(&block)
  @on_result = block
end

#prepare_to_exit(key = nil) ⇒ Object



119
120
121
122
123
124
125
126
127
128
129
# File 'lib/drbqs/worker/worker_process_set.rb', line 119

def prepare_to_exit(key = nil)
  if key
    if h = send_object(key, :prepare_to_exit)
      h[:exit] = true
    end
  else
    @process.each do |key, h|
      prepare_to_exit(key)
    end
  end
end

#respond_signalObject

Read IOs and respond signals from chiled processes. If there is no data from child processes then the method returns false. Otherwise, true. Types of signals are :result, :node_error, :finish_preparing_to_exit.

  • :result Execute callback set by DRbQS::Worker::ProcessSet#on_result.
  • :node_error Execute callback set by DRbQS::Worker::ProcessSet#on_error.
  • :finish_preparing_to_exit Send :exit signale to the process and delete from list of child processes.


151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
# File 'lib/drbqs/worker/worker_process_set.rb', line 151

def respond_signal
  num = 0
  to_be_deleted = []
  @process.each do |key, h|
    if !h[:task].empty? || h[:exit]
      data = ''
      begin
        loop do
          data << h[:in].read_nonblock(READ_BYTE_SIZE)
        end
      rescue IO::WaitReadable
      rescue
        $stderr.puts "Stored data: " + data.inspect
        raise
      end
      if !data.empty?
        num += 1
        h[:unpacker].feed_each(data) do |ary|
          response_type, response = ary
          case response_type
          when :result
            task_id, result = response
            h[:task].delete(task_id)
            if @on_result
              @on_result.call(key, [task_id, result])
            else
              $stderr.puts "The instance of DRbQS::Worker::ProcessSet can not deal with results from child processes."
            end
          when :node_error
            if @on_error
              @on_error.call(key, response)
            else
              $stderr.puts "The instance of DRbQS::Worker::ProcessSet can not deal with error from child processes."
            end
          when :finish_preparing_to_exit
            delete_process(key)
            to_be_deleted << key
          end
        end
      end
    end
  end
  to_be_deleted.each do |key|
    @process.delete(key)
  end
  to_be_deleted.clear
  num > 0
end

#send_task(key, dumped_task_ary) ⇒ Object

Parameters:

  • dumped_task_ary (Array)

    is [task_id, obj, method_name, args].



109
110
111
112
113
114
115
116
117
# File 'lib/drbqs/worker/worker_process_set.rb', line 109

def send_task(key, dumped_task_ary)
  if h = send_object(key, dumped_task_ary)
    if dumped_task_ary[0]
      h[:task] << dumped_task_ary[0]
    end
  else
    raise "Process #{key.inspect} does not exist."
  end
end

#waitall(interval_time = nil) ⇒ Object



210
211
212
213
214
215
216
217
218
219
220
221
222
223
# File 'lib/drbqs/worker/worker_process_set.rb', line 210

def waitall(interval_time = nil)
  unless @process.all? { |key, h| h[:exit] }
    return nil
  end
  t = interval_time || WAITALL_INTERVAL_TIME
  until @process.empty?
    respond_signal
    Kernel.sleep(t)
  end
  until Process.waitall == []
    Kernel.sleep(t)
  end
  true
end

#waiting?(key) ⇒ Boolean

Return true if the process +key+ does not calculate any tasks.

Returns:

  • (Boolean)


77
78
79
# File 'lib/drbqs/worker/worker_process_set.rb', line 77

def waiting?(key)
  !calculating?(key)
end

#waiting_processesObject

Return keys of processes not calculating a task.



82
83
84
85
86
# File 'lib/drbqs/worker/worker_process_set.rb', line 82

def waiting_processes
  @process.keys.select do |key|
    @process[key][:task].empty?
  end
end