Class: DRbQS::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/drbqs/worker/worker.rb,
lib/drbqs/worker/serialize.rb,
lib/drbqs/worker/forked_process.rb,
lib/drbqs/worker/worker_process_set.rb

Overview

We can use DRbQS::Worker to send some child processes. Note that DRbQS::Worker is not used in DRbQS::Node class and then is not included in main part of DRbQS.

Defined Under Namespace

Classes: ForkedProcess, ProcessSet, Serialize, SimpleForkedProcess

Constant Summary collapse

READ_BYTE_SIZE =
10240

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ Worker

Returns a new instance of Worker.



10
11
12
13
14
15
16
17
18
19
20
21
# File 'lib/drbqs/worker/worker.rb', line 10

def initialize(opts = {})
  @process = DRbQS::Worker::ProcessSet.new(opts[:class])
  if opts[:key]
    opts[:key].each do |key|
      @process.create_process(key)
    end
  end
  @state = Hash.new { |h, k| h[k] = Hash.new }
  @task_pool = {}
  @task_group = Hash.new { |h, k| h[k] = Array.new }
  @task_num = 0
end

Instance Attribute Details

#processObject (readonly)

Returns the value of attribute process.



8
9
10
# File 'lib/drbqs/worker/worker.rb', line 8

def process
  @process
end

Instance Method Details

#add_task(task, broadcast = nil) ⇒ Object



67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/drbqs/worker/worker.rb', line 67

def add_task(task, broadcast = nil)
  if broadcast
    @process.all_processes.each do |proc_key|
      send_task(proc_key, nil, task)
    end
  else
    task_id = (@task_num += 1)
    @task_pool[task_id] = { :task => task }
    @task_group[task.group] << task_id
    task_id
  end
end

#calculating?Boolean

Returns:

  • (Boolean)


23
24
25
# File 'lib/drbqs/worker/worker.rb', line 23

def calculating?
  !@task_pool.empty?
end

#finish(interval_time = 1) ⇒ Object

Send signal to exit to all child processes and wait the completion with sleep +interval_time+.

Parameters:

  • interval_time (Numeric) (defaults to: 1)

    An argument of Kernel#sleep.



131
132
133
134
# File 'lib/drbqs/worker/worker.rb', line 131

def finish(interval_time = 1)
  @process.prepare_to_exit
  @process.waitall(interval_time)
end

#group(grp, *keys) ⇒ Object



39
40
41
42
43
# File 'lib/drbqs/worker/worker.rb', line 39

def group(grp, *keys)
  keys.each do |key|
    (@state[key][:group] ||= []) << grp
  end
end

#on_error(&block) ⇒ Object



57
58
59
# File 'lib/drbqs/worker/worker.rb', line 57

def on_error(&block)
  @process.on_error(&block)
end

#on_result(&block) ⇒ Object



45
46
47
48
49
50
51
52
53
54
55
# File 'lib/drbqs/worker/worker.rb', line 45

def on_result(&block)
  @process.on_result do |proc_key, ary|
    task_id, result = ary
    if task_data = @task_pool.delete(task_id)
      task = task_data[:task]
      @task_group[task.group].delete(task_id)
      task.exec_hook(self, result)
    end
    block.call(proc_key, ary)
  end
end

#sleep(*keys) ⇒ Object



27
28
29
30
31
# File 'lib/drbqs/worker/worker.rb', line 27

def sleep(*keys)
  keys.each do |key|
    @state[key][:sleep] = true
  end
end

#stepObject

This method sends a stored task for each process that is not calculating a task and responds signals from child processes.



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/drbqs/worker/worker.rb', line 82

def step
  @process.waiting_processes.each do |proc_key|
    if @state[proc_key][:sleep]
      next
    end
    catch(:add) do
      grps = (@state[proc_key][:group] || []) + [DRbQS::Task::DEFAULT_GROUP]
      grps.each do |gr|
        @task_group[gr].each do |task_id|
          task_data = @task_pool[task_id]
          if !task_data[:calculate]
            send_task(proc_key, task_id, task_data[:task])
            @task_pool[task_id][:calculate] = true
            throw :add
          end
        end
      end
    end
  end
  @process.respond_signal
end

#wait(task_id, interval_time) ⇒ Object

Wait finish of task +task_id+ with sleep +interval_time+.

Parameters:

  • task_id (Fixnum)
  • interval_time (Numeric)

    An argument of Kernel#sleep.



107
108
109
110
111
112
113
114
115
116
117
# File 'lib/drbqs/worker/worker.rb', line 107

def wait(task_id, interval_time)
  if @task_pool[task_id]
    loop do
      step
      unless @task_pool[task_id]
        return true
      end
      Kernel.sleep(interval_time)
    end
  end
end

#waitall(interval_time) ⇒ Object

Wait finishes of all tasks with sleep +interval_time+.

Parameters:

  • interval_time (Numeric)

    An argument of Kernel#sleep.



121
122
123
124
125
126
# File 'lib/drbqs/worker/worker.rb', line 121

def waitall(interval_time)
  while calculating?
    step
    Kernel.sleep(interval_time)
  end
end

#wakeup(*keys) ⇒ Object



33
34
35
36
37
# File 'lib/drbqs/worker/worker.rb', line 33

def wakeup(*keys)
  keys.each do |key|
    @state[key][:sleep] = false
  end
end