Class: DRbQS::Worker
- Inherits:
-
Object
- Object
- DRbQS::Worker
- Defined in:
- lib/drbqs/worker/worker.rb,
lib/drbqs/worker/serialize.rb,
lib/drbqs/worker/forked_process.rb,
lib/drbqs/worker/worker_process_set.rb
Overview
We can use DRbQS::Worker to send some child processes. Note that DRbQS::Worker is not used in DRbQS::Node class and then is not included in main part of DRbQS.
Defined Under Namespace
Classes: ForkedProcess, ProcessSet, Serialize, SimpleForkedProcess
Constant Summary collapse
- READ_BYTE_SIZE =
10240
Instance Attribute Summary collapse
-
#process ⇒ Object
readonly
Returns the value of attribute process.
Instance Method Summary collapse
- #add_task(task, broadcast = nil) ⇒ Object
- #calculating? ⇒ Boolean
-
#finish(interval_time = 1) ⇒ Object
Send signal to exit to all child processes and wait the completion with sleep +interval_time+.
- #group(grp, *keys) ⇒ Object
-
#initialize(opts = {}) ⇒ Worker
constructor
A new instance of Worker.
- #on_error(&block) ⇒ Object
- #on_result(&block) ⇒ Object
- #sleep(*keys) ⇒ Object
-
#step ⇒ Object
This method sends a stored task for each process that is not calculating a task and responds signals from child processes.
-
#wait(task_id, interval_time) ⇒ Object
Wait finish of task +task_id+ with sleep +interval_time+.
-
#waitall(interval_time) ⇒ Object
Wait finishes of all tasks with sleep +interval_time+.
- #wakeup(*keys) ⇒ Object
Constructor Details
#initialize(opts = {}) ⇒ Worker
Returns a new instance of Worker.
10 11 12 13 14 15 16 17 18 19 20 21 |
# File 'lib/drbqs/worker/worker.rb', line 10 def initialize(opts = {}) @process = DRbQS::Worker::ProcessSet.new(opts[:class]) if opts[:key] opts[:key].each do |key| @process.create_process(key) end end @state = Hash.new { |h, k| h[k] = Hash.new } @task_pool = {} @task_group = Hash.new { |h, k| h[k] = Array.new } @task_num = 0 end |
Instance Attribute Details
#process ⇒ Object (readonly)
Returns the value of attribute process.
8 9 10 |
# File 'lib/drbqs/worker/worker.rb', line 8 def process @process end |
Instance Method Details
#add_task(task, broadcast = nil) ⇒ Object
67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/drbqs/worker/worker.rb', line 67 def add_task(task, broadcast = nil) if broadcast @process.all_processes.each do |proc_key| send_task(proc_key, nil, task) end else task_id = (@task_num += 1) @task_pool[task_id] = { :task => task } @task_group[task.group] << task_id task_id end end |
#calculating? ⇒ Boolean
23 24 25 |
# File 'lib/drbqs/worker/worker.rb', line 23 def calculating? !@task_pool.empty? end |
#finish(interval_time = 1) ⇒ Object
Send signal to exit to all child processes and wait the completion with sleep +interval_time+.
131 132 133 134 |
# File 'lib/drbqs/worker/worker.rb', line 131 def finish(interval_time = 1) @process.prepare_to_exit @process.waitall(interval_time) end |
#group(grp, *keys) ⇒ Object
39 40 41 42 43 |
# File 'lib/drbqs/worker/worker.rb', line 39 def group(grp, *keys) keys.each do |key| (@state[key][:group] ||= []) << grp end end |
#on_error(&block) ⇒ Object
57 58 59 |
# File 'lib/drbqs/worker/worker.rb', line 57 def on_error(&block) @process.on_error(&block) end |
#on_result(&block) ⇒ Object
45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/drbqs/worker/worker.rb', line 45 def on_result(&block) @process.on_result do |proc_key, ary| task_id, result = ary if task_data = @task_pool.delete(task_id) task = task_data[:task] @task_group[task.group].delete(task_id) task.exec_hook(self, result) end block.call(proc_key, ary) end end |
#sleep(*keys) ⇒ Object
27 28 29 30 31 |
# File 'lib/drbqs/worker/worker.rb', line 27 def sleep(*keys) keys.each do |key| @state[key][:sleep] = true end end |
#step ⇒ Object
This method sends a stored task for each process that is not calculating a task and responds signals from child processes.
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/drbqs/worker/worker.rb', line 82 def step @process.waiting_processes.each do |proc_key| if @state[proc_key][:sleep] next end catch(:add) do grps = (@state[proc_key][:group] || []) + [DRbQS::Task::DEFAULT_GROUP] grps.each do |gr| @task_group[gr].each do |task_id| task_data = @task_pool[task_id] if !task_data[:calculate] send_task(proc_key, task_id, task_data[:task]) @task_pool[task_id][:calculate] = true throw :add end end end end end @process.respond_signal end |
#wait(task_id, interval_time) ⇒ Object
Wait finish of task +task_id+ with sleep +interval_time+.
107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/drbqs/worker/worker.rb', line 107 def wait(task_id, interval_time) if @task_pool[task_id] loop do step unless @task_pool[task_id] return true end Kernel.sleep(interval_time) end end end |
#waitall(interval_time) ⇒ Object
Wait finishes of all tasks with sleep +interval_time+.
121 122 123 124 125 126 |
# File 'lib/drbqs/worker/worker.rb', line 121 def waitall(interval_time) while calculating? step Kernel.sleep(interval_time) end end |
#wakeup(*keys) ⇒ Object
33 34 35 36 37 |
# File 'lib/drbqs/worker/worker.rb', line 33 def wakeup(*keys) keys.each do |key| @state[key][:sleep] = false end end |