Class: Pione::Model::TaskWorkerBrokerModel
- Inherits:
-
Rootage::Model
- Object
- Rootage::Model
- Pione::Model::TaskWorkerBrokerModel
- Defined in:
- lib/pione/model/task-worker-broker-model.rb
Instance Attribute Summary collapse
-
#task_workers ⇒ Object
Returns the value of attribute task_workers.
-
#tuple_space ⇒ Object
Returns the value of attribute tuple_space.
-
#tuple_space_lock ⇒ Object
readonly
lock for tuple space table.
Instance Method Summary collapse
-
#add_tuple_space(tuple_space) ⇒ Object
Add the tuple space.
-
#create_task_worker(tuple_space) ⇒ Object
Create a task worker for the tuple space.
-
#delete_dead_task_workers ⇒ Object
Delete all dead task workers.
-
#delete_dead_tuple_spaces ⇒ Object
Delete all dead tuple spaces.
- #delete_task_worker(worker) ⇒ Object
-
#excess_task_workers ⇒ Object
Return excess number of task workers belong to the broker.
-
#get_tuple_space(tuple_space_id) ⇒ Object
Get the tuple space.
-
#initialize ⇒ TaskWorkerBrokerModel
constructor
A new instance of TaskWorkerBrokerModel.
-
#quantity ⇒ Object
Return number of task workers the broker manages.
-
#terminate_task_worker_if(&condition) ⇒ Object
Terminate first task worker that satisfies the condition.
-
#tuple_spaces ⇒ Object
Return known tuple spaces.
-
#update_tuple_spaces(tuple_spaces) ⇒ Object
Update tuple space list.
Methods inherited from Rootage::Model
#[], #[]=, #specified?, #specify, #to_hash
Constructor Details
#initialize ⇒ TaskWorkerBrokerModel
Returns a new instance of TaskWorkerBrokerModel.
8 9 10 11 12 13 14 15 16 17 18 |
# File 'lib/pione/model/task-worker-broker-model.rb', line 8 def initialize super @spawnings = 0 # number of current spawning task worker @task_worker_lock = Monitor.new # lock for task worker table @task_workers = Array.new # known task worker fronts @tuple_space_lock = Monitor.new @tuple_space = Hash.new # known tuple space table self[:spawn_task_worker] = true end |
Instance Attribute Details
#task_workers ⇒ Object
Returns the value of attribute task_workers.
4 5 6 |
# File 'lib/pione/model/task-worker-broker-model.rb', line 4 def task_workers @task_workers end |
#tuple_space ⇒ Object
Returns the value of attribute tuple_space.
5 6 7 |
# File 'lib/pione/model/task-worker-broker-model.rb', line 5 def tuple_space @tuple_space end |
#tuple_space_lock ⇒ Object (readonly)
lock for tuple space table
6 7 8 |
# File 'lib/pione/model/task-worker-broker-model.rb', line 6 def tuple_space_lock @tuple_space_lock end |
Instance Method Details
#add_tuple_space(tuple_space) ⇒ Object
Add the tuple space.
21 22 23 24 25 26 |
# File 'lib/pione/model/task-worker-broker-model.rb', line 21 def add_tuple_space(tuple_space) uuid = tuple_space.uuid # update tuple space table with the id @tuple_space_lock.synchronize {@tuple_space[uuid] = tuple_space} end |
#create_task_worker(tuple_space) ⇒ Object
Create a task worker for the tuple space. This method returns true if we suceeds to spawn the task worker, or returns false.
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/pione/model/task-worker-broker-model.rb', line 30 def create_task_worker(tuple_space) res = true @task_worker_lock.synchronize do @spawnings += 1 # spawn a new process of pione-task-worker command if self[:spawn_task_worker] # make task worker's parameters param = { :features => Global.features, :tuple_space_id => tuple_space.uuid } begin spawner = Command::PioneTaskWorker.spawn(self, param) @task_workers << spawner.child_front spawner.when_terminated {delete_task_worker(spawner.child_front)} rescue Command::SpawnError => e Log::Debug.system("Task worker broker agent failed to spawn a task worker: %s" % e.) res = false end else @task_workers << Agent::TaskWorker.start(tuple_space, Global.expressional_features, @env) end @spawnings -= 1 end return res end |
#delete_dead_task_workers ⇒ Object
Delete all dead task workers.
67 68 69 70 71 72 73 |
# File 'lib/pione/model/task-worker-broker-model.rb', line 67 def delete_dead_task_workers @task_worker_lock.synchronize do @task_workers.delete_if do |worker| not(Util.ignore_exception {timeout(1) {worker.ping}}) end end end |
#delete_dead_tuple_spaces ⇒ Object
Delete all dead tuple spaces.
76 77 78 79 80 81 82 |
# File 'lib/pione/model/task-worker-broker-model.rb', line 76 def delete_dead_tuple_spaces @tuple_space_lock.synchronize do @tuple_space.delete_if do |_, space| not(Util.ignore_exception {timeout(1) {space.ping}}) end end end |
#delete_task_worker(worker) ⇒ Object
62 63 64 |
# File 'lib/pione/model/task-worker-broker-model.rb', line 62 def delete_task_worker(worker) @task_worker_lock.synchronize {@task_workers.delete(worker)} end |
#excess_task_workers ⇒ Object
Return excess number of task workers belong to the broker.
85 86 87 88 89 |
# File 'lib/pione/model/task-worker-broker-model.rb', line 85 def excess_task_workers @task_worker_lock.synchronize do @task_worker_size - @task_workers.size - @spawnings end end |
#get_tuple_space(tuple_space_id) ⇒ Object
Get the tuple space.
92 93 94 |
# File 'lib/pione/model/task-worker-broker-model.rb', line 92 def get_tuple_space(tuple_space_id) @tuple_space_lock.synchronize {@tuple_space[tuple_space_id]} end |
#quantity ⇒ Object
Return number of task workers the broker manages.
97 98 99 |
# File 'lib/pione/model/task-worker-broker-model.rb', line 97 def quantity @task_worker_lock.synchronize {@task_workers.size} end |
#terminate_task_worker_if(&condition) ⇒ Object
Terminate first task worker that satisfies the condition.
102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/pione/model/task-worker-broker-model.rb', line 102 def terminate_task_worker_if(&condition) @task_worker_lock.synchronize do @task_workers.each do |worker| if condition.call(worker) worker.terminate @task_workers.delete(worker) return true end end end return false end |
#tuple_spaces ⇒ Object
Return known tuple spaces.
116 117 118 |
# File 'lib/pione/model/task-worker-broker-model.rb', line 116 def tuple_spaces @tuple_space_lock.synchronize {@tuple_space.values} end |
#update_tuple_spaces(tuple_spaces) ⇒ Object
Update tuple space list.
121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/pione/model/task-worker-broker-model.rb', line 121 def update_tuple_spaces(tuple_spaces) @tuple_space_lock.synchronize do # clear and update tuple space list @tuple_space.clear tuple_spaces.each {|tuple_space| add_tuple_space(tuple_space)} Log::Debug.system do list = @tuple_space.values.map{|space| space.__drburi} "Task worker broker has updated tuple space table: %s" % [list] end end end |