Class: DRbQS::Server::Queue
- Inherits:
-
Object
- Object
- DRbQS::Server::Queue
- Defined in:
- lib/drbqs/server/queue.rb
Instance Attribute Summary collapse
-
#calculating ⇒ Object
readonly
Returns the value of attribute calculating.
-
#history ⇒ Object
readonly
Returns the value of attribute history.
Instance Method Summary collapse
-
#add(task) ⇒ Fixnum
Task ID (for debug).
- #all_logs ⇒ Object
- #calculating_nodes ⇒ Object
-
#calculating_task_message ⇒ Object
Return a hash of which keys are node ID number and values are an array of pairs of task ID number and its message.
- #calculating_task_number ⇒ Object
-
#empty? ⇒ Boolean
If queue is empty, that is, there is no tasks to calculate next, this method returns true.
- #exec_task_hook(main_server, task_id, result) ⇒ Object
-
#finished? ⇒ Boolean
If there are no tasks in queue and calculating, return true.
- #finished_task_number ⇒ Object
- #get_accept_signal ⇒ Object
- #get_result(main_server) ⇒ Object
-
#initialize(queue, result, logger = DRbQS::Misc::LoggerDummy.new) ⇒ Queue
constructor
A new instance of Queue.
- #requeue_for_deleted_node_id(deleted) ⇒ Object
- #stocked_task_number ⇒ Object
Constructor Details
#initialize(queue, result, logger = DRbQS::Misc::LoggerDummy.new) ⇒ Queue
Returns a new instance of Queue.
9 10 11 12 13 14 15 16 17 |
# File 'lib/drbqs/server/queue.rb', line 9 def initialize(queue, result, logger = DRbQS::Misc::LoggerDummy.new) @queue = queue @result = result @task_id = 0 @cache = {} @calculating = Hash.new { |hash, key| hash[key] = Array.new } @history = DRbQS::Server::TaskHistory.new @logger = logger end |
Instance Attribute Details
#calculating ⇒ Object (readonly)
Returns the value of attribute calculating.
7 8 9 |
# File 'lib/drbqs/server/queue.rb', line 7 def calculating @calculating end |
#history ⇒ Object (readonly)
Returns the value of attribute history.
7 8 9 |
# File 'lib/drbqs/server/queue.rb', line 7 def history @history end |
Instance Method Details
#add(task) ⇒ Fixnum
Returns task ID (for debug).
25 26 27 28 29 30 31 32 |
# File 'lib/drbqs/server/queue.rb', line 25 def add(task) @task_id += 1 @logger.info("New task: #{@task_id}") @cache[@task_id] = task queue_task(@task_id) @history.set(@task_id, :add, task.note) @task_id end |
#all_logs ⇒ Object
141 142 143 |
# File 'lib/drbqs/server/queue.rb', line 141 def all_logs @history.log_strings end |
#calculating_nodes ⇒ Object
145 146 147 148 149 150 151 152 153 |
# File 'lib/drbqs/server/queue.rb', line 145 def calculating_nodes nodes = [] @calculating.each do |node_id, tasks| if tasks.size > 0 nodes << node_id end end nodes.sort! end |
#calculating_task_message ⇒ Object
Return a hash of which keys are node ID number and values are an array of pairs of task ID number and its message.
105 106 107 108 109 110 111 112 113 |
# File 'lib/drbqs/server/queue.rb', line 105 def mes = {} @calculating.each do |node_id, task_id_ary| mes[node_id] = task_id_ary.map do |n| [n, @history.events(n)[0][2]] end end mes end |
#calculating_task_number ⇒ Object
115 116 117 |
# File 'lib/drbqs/server/queue.rb', line 115 def calculating_task_number @calculating.inject(0) { |s, key_val| s + key_val[1].size } end |
#empty? ⇒ Boolean
If queue is empty, that is, there is no tasks to calculate next, this method returns true. Otherwise, false. Even if there are calculating tasks, the method can return true.
131 132 133 |
# File 'lib/drbqs/server/queue.rb', line 131 def empty? stocked_task_number == 0 end |
#exec_task_hook(main_server, task_id, result) ⇒ Object
73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/drbqs/server/queue.rb', line 73 def exec_task_hook(main_server, task_id, result) if task = @cache.delete(task_id) if task.exec_hook(main_server, result) @history.set(task_id, :hook) end true else @logger.error("Task #{task_id} is not cached.") false end end |
#finished? ⇒ Boolean
If there are no tasks in queue and calculating, return true. Otherwise, false.
137 138 139 |
# File 'lib/drbqs/server/queue.rb', line 137 def finished? @cache.size == 0 end |
#finished_task_number ⇒ Object
123 124 125 |
# File 'lib/drbqs/server/queue.rb', line 123 def finished_task_number @history.finished_task_number end |
#get_accept_signal ⇒ Object
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/drbqs/server/queue.rb', line 34 def get_accept_signal count = 0 begin loop do sym, task_id, node_id = @result.take([:accept, Fixnum, Fixnum], 0) count += 1 @calculating[node_id] << task_id @history.set(task_id, :calculate, node_id) @logger.info("Accept: task #{task_id} by node #{node_id}.") end rescue Rinda::RequestExpiredError @logger.debug("Accept: #{count} signals.") end count end |
#get_result(main_server) ⇒ Object
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/drbqs/server/queue.rb', line 85 def get_result(main_server) count = 0 begin loop do get_accept_signal sym, task_id, node_id, result = @result.take([:result, Fixnum, Fixnum, nil], 0) count += 1 @history.set(task_id, :result, node_id) @logger.info("Get: result of #{task_id} from node #{node_id}.") delete_task_id(node_id, task_id) exec_task_hook(main_server, task_id, result) end rescue Rinda::RequestExpiredError @logger.debug("Get: #{count} results.") end count end |
#requeue_for_deleted_node_id(deleted) ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/drbqs/server/queue.rb', line 50 def requeue_for_deleted_node_id(deleted) deleted.each do |node_id| if task_id_ary = @calculating[node_id] task_id_ary.each do |task_id| queue_task(task_id) @history.set(task_id, :requeue) @logger.info("Requeue: task #{task_id}.") end @calculating.delete(node_id) end end end |
#stocked_task_number ⇒ Object
119 120 121 |
# File 'lib/drbqs/server/queue.rb', line 119 def stocked_task_number @cache.size - calculating_task_number end |