Class: DRbQS::Node::State
- Inherits:
-
Object
- Object
- DRbQS::Node::State
- Defined in:
- lib/drbqs/node/state.rb
Constant Summary collapse
- ALL_STATES =
[:sleep, :wait, :calculate, :exit]
- DEFAULT_SLEEP_TIME =
300
- LOADAVG_PATH =
'/proc/loadavg'
Instance Attribute Summary collapse
-
#calculating_task ⇒ Object
readonly
Value of state is :sleep, :wait, or :calculate.
Instance Method Summary collapse
- #all_workers_waiting? ⇒ Boolean
- #change(proc_id, state) ⇒ Object
- #change_to_sleep ⇒ Object
- #change_to_sleep_for_busy_system ⇒ Object
- #each_worker_id(&block) ⇒ Object
- #get_state(wid) ⇒ Object
-
#initialize(state_init, process_number, opts = {}) ⇒ State
constructor
A new instance of State.
- #ready_to_exit_after_task? ⇒ Boolean
- #request? ⇒ Boolean
- #request_task_number ⇒ Object
- #set_calculating_task(wid, task_id) ⇒ Object
- #set_exit_after_task ⇒ Object
- #set_finish_of_task(sent_task_id) ⇒ Object
- #sleep_with_auto_wakeup ⇒ Object
- #waiting_worker_id ⇒ Object
- #wakeup_automatically_for_unbusy_system ⇒ Object
- #wakeup_sleeping_worker ⇒ Object
Constructor Details
#initialize(state_init, process_number, opts = {}) ⇒ State
Returns a new instance of State.
11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# File 'lib/drbqs/node/state.rb', line 11 def initialize(state_init, process_number, opts = {}) @process_number = process_number @process_state = {} @process_number.times do |i| @process_state[i] = state_init end @calculating_task = {} @state_after_task = nil @load_average_threshold = opts[:max_loadavg] @sleep_time = opts[:sleep_time] || DEFAULT_SLEEP_TIME @auto_wakeup = nil end |
Instance Attribute Details
#calculating_task ⇒ Object (readonly)
Value of state is :sleep, :wait, or :calculate.
5 6 7 |
# File 'lib/drbqs/node/state.rb', line 5 def calculating_task @calculating_task end |
Instance Method Details
#all_workers_waiting? ⇒ Boolean
61 62 63 64 65 66 |
# File 'lib/drbqs/node/state.rb', line 61 def all_workers_waiting? each_worker_id.all? do |wid| st = get_state(wid) st == :wait || st == :exit end end |
#change(proc_id, state) ⇒ Object
98 99 100 101 102 103 |
# File 'lib/drbqs/node/state.rb', line 98 def change(proc_id, state) unless ALL_STATES.include?(state) raise ArgumentError, "Invalid state of node '#{state}'." end @process_state[proc_id] = state end |
#change_to_sleep ⇒ Object
114 115 116 117 118 119 120 121 122 123 |
# File 'lib/drbqs/node/state.rb', line 114 def change_to_sleep each_worker_id do |wid| st = get_state(wid) if st == :calculate @state_after_task = :sleep elsif st != :exit change(wid, :sleep) end end end |
#change_to_sleep_for_busy_system ⇒ Object
165 166 167 168 169 170 171 |
# File 'lib/drbqs/node/state.rb', line 165 def change_to_sleep_for_busy_system if system_busy? sleep_with_auto_wakeup return true end nil end |
#each_worker_id(&block) ⇒ Object
30 31 32 33 34 35 36 37 38 |
# File 'lib/drbqs/node/state.rb', line 30 def each_worker_id(&block) if block_given? @process_state.each do |key, val| yield(key) end else to_enum(:each_worker_id) end end |
#get_state(wid) ⇒ Object
26 27 28 |
# File 'lib/drbqs/node/state.rb', line 26 def get_state(wid) @process_state[wid] end |
#ready_to_exit_after_task? ⇒ Boolean
173 174 175 |
# File 'lib/drbqs/node/state.rb', line 173 def ready_to_exit_after_task? @state_after_task == :exit && all_workers_waiting? end |
#request? ⇒ Boolean
57 58 59 |
# File 'lib/drbqs/node/state.rb', line 57 def request? @state_after_task != :exit && request_task_number > 0 end |
#request_task_number ⇒ Object
50 51 52 53 54 55 |
# File 'lib/drbqs/node/state.rb', line 50 def request_task_number waiting = @process_state.select do |wid, state| state == :wait end waiting.size end |
#set_calculating_task(wid, task_id) ⇒ Object
68 69 70 71 |
# File 'lib/drbqs/node/state.rb', line 68 def set_calculating_task(wid, task_id) @calculating_task[task_id] = wid change(wid, :calculate) end |
#set_exit_after_task ⇒ Object
73 74 75 76 77 78 79 80 81 |
# File 'lib/drbqs/node/state.rb', line 73 def set_exit_after_task @state_after_task = :exit each_worker_id do |wid| st = get_state(wid) if (st == :wait) && (st == :sleep) change(wid, :exit) end end end |
#set_finish_of_task(sent_task_id) ⇒ Object
83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/drbqs/node/state.rb', line 83 def set_finish_of_task(sent_task_id) sent_task_id.each do |task_id| if wid = @calculating_task.delete(task_id) case @state_after_task when :exit @process_state[wid] = :exit when :sleep @process_state[wid] = :sleep else @process_state[wid] = :wait end end end end |
#sleep_with_auto_wakeup ⇒ Object
125 126 127 128 129 130 131 132 |
# File 'lib/drbqs/node/state.rb', line 125 def sleep_with_auto_wakeup each_worker_id do |wid| if get_state(wid) == :wait change(wid, :sleep) end end @auto_wakeup = Time.now + @sleep_time end |
#waiting_worker_id ⇒ Object
40 41 42 43 44 45 46 47 48 |
# File 'lib/drbqs/node/state.rb', line 40 def waiting_worker_id ary = [] @process_state.each do |wid, state| if state == :wait ary << wid end end ary end |
#wakeup_automatically_for_unbusy_system ⇒ Object
134 135 136 137 138 139 140 141 142 143 144 145 |
# File 'lib/drbqs/node/state.rb', line 134 def wakeup_automatically_for_unbusy_system if @auto_wakeup && Time.now > @auto_wakeup && !system_busy? each_worker_id do |wid| if get_state(wid) == :sleep change(wid, :wait) end end @auto_wakeup = nil return true end nil end |
#wakeup_sleeping_worker ⇒ Object
105 106 107 108 109 110 111 112 |
# File 'lib/drbqs/node/state.rb', line 105 def wakeup_sleeping_worker each_worker_id do |wid| if get_state(wid) == :sleep change(wid, :wait) end end @state_after_task = nil end |