Class: DRbQS::Node::TaskClient
- Inherits:
-
Object
- Object
- DRbQS::Node::TaskClient
- Defined in:
- lib/drbqs/node/task_client.rb
Instance Attribute Summary collapse
-
#group ⇒ Object
readonly
Returns the value of attribute group.
-
#node_number ⇒ Object
readonly
Returns the value of attribute node_number.
Instance Method Summary collapse
- #add_new_task(num) ⇒ Object
-
#dequeue_task ⇒ nil, Array
If @task_queue is empty then return nil.
- #dump_result_queue ⇒ Object
- #get_task ⇒ Object
- #get_task_by_group(grp) ⇒ Object
-
#initialize(node_number, queue, result, group, logger = DRbQS::Misc::LoggerDummy.new) ⇒ TaskClient
constructor
A new instance of TaskClient.
- #queue_result(task_id, result) ⇒ Object
- #queue_task(ary) ⇒ Object
- #result_empty? ⇒ Boolean
-
#send_result ⇒ Object
Return an array of task ID that is sent to the server.
- #task_empty? ⇒ Boolean
Constructor Details
#initialize(node_number, queue, result, group, logger = DRbQS::Misc::LoggerDummy.new) ⇒ TaskClient
Returns a new instance of TaskClient.
6 7 8 9 10 11 12 13 14 |
# File 'lib/drbqs/node/task_client.rb', line 6 def initialize(node_number, queue, result, group, logger = DRbQS::Misc::LoggerDummy.new) @node_number = node_number @queue = queue @result = result @task_queue = Queue.new @result_queue = Queue.new @group = group || [] @logger = logger end |
Instance Attribute Details
#group ⇒ Object (readonly)
Returns the value of attribute group.
4 5 6 |
# File 'lib/drbqs/node/task_client.rb', line 4 def group @group end |
#node_number ⇒ Object (readonly)
Returns the value of attribute node_number.
4 5 6 |
# File 'lib/drbqs/node/task_client.rb', line 4 def node_number @node_number end |
Instance Method Details
#add_new_task(num) ⇒ Object
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/drbqs/node/task_client.rb', line 61 def add_new_task(num) get_task_id = [] num.times do |i| if ary = get_task task_id = ary[0] @logger.info("Send accept signal: node #{@node_number} caluclating #{task_id}") @result.write([:accept, task_id, @node_number]) queue_task(ary) get_task_id << task_id else break end end get_task_id.empty? ? nil : get_task_id end |
#dequeue_task ⇒ nil, Array
Returns If @task_queue is empty then return nil. Otherwise, an array [task_id, obj, method_name, args] is returned.
36 37 38 39 40 41 42 |
# File 'lib/drbqs/node/task_client.rb', line 36 def dequeue_task if @task_queue.empty? nil else @task_queue.deq end end |
#dump_result_queue ⇒ Object
93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/drbqs/node/task_client.rb', line 93 def dump_result_queue results = [] while !result_empty? task_id, res = dequeue_result results << res end if results.size > 0 Marshal.dump(results) else nil end end |
#get_task ⇒ Object
52 53 54 55 56 57 58 59 |
# File 'lib/drbqs/node/task_client.rb', line 52 def get_task @group.each do |grp| if task = get_task_by_group(grp) return task end end get_task_by_group(DRbQS::Task::DEFAULT_GROUP) end |
#get_task_by_group(grp) ⇒ Object
44 45 46 47 48 49 50 |
# File 'lib/drbqs/node/task_client.rb', line 44 def get_task_by_group(grp) begin @queue.take([grp, Fixnum, nil, Symbol, nil], 0)[1..-1] rescue Rinda::RequestExpiredError nil end end |
#queue_result(task_id, result) ⇒ Object
89 90 91 |
# File 'lib/drbqs/node/task_client.rb', line 89 def queue_result(task_id, result) @result_queue.enq([task_id, result]) end |
#queue_task(ary) ⇒ Object
30 31 32 |
# File 'lib/drbqs/node/task_client.rb', line 30 def queue_task(ary) @task_queue.enq(ary) end |
#result_empty? ⇒ Boolean
20 21 22 |
# File 'lib/drbqs/node/task_client.rb', line 20 def result_empty? @result_queue.empty? end |
#send_result ⇒ Object
Return an array of task ID that is sent to the server.
78 79 80 81 82 83 84 85 86 87 |
# File 'lib/drbqs/node/task_client.rb', line 78 def send_result sent_task_id = [] while !result_empty? task_id, result = dequeue_result @logger.info("Send result: #{task_id}") { result.inspect } @result.write([:result, task_id, @node_number, result]) sent_task_id << task_id end sent_task_id.empty? ? nil : sent_task_id end |
#task_empty? ⇒ Boolean
16 17 18 |
# File 'lib/drbqs/node/task_client.rb', line 16 def task_empty? @task_queue.empty? end |