Class: Pione::Notification::TaskWorkerBrokerRecipient
- Defined in:
- lib/pione/notification/task-worker-broker-recipient.rb
Overview
TaskWorkerBrokerRecipient is a recipient for task worker broker agent.
Instance Method Summary collapse
-
#clean ⇒ Object
Clean tuple space table.
-
#get_tuple_space(uri) ⇒ Object
Get a tuple space from front server at the URI.
-
#initialize(model, front_uri, listener_uri) ⇒ TaskWorkerBrokerRecipient
constructor
A new instance of TaskWorkerBrokerRecipient.
-
#receive_tuple_space(message) ⇒ Object
Receive a "tupele space" message.
-
#terminate ⇒ Object
Terminate the recipient.
-
#update_broker ⇒ Object
Update the tuple space list of broker.
Methods inherited from Recipient
Constructor Details
#initialize(model, front_uri, listener_uri) ⇒ TaskWorkerBrokerRecipient
Returns a new instance of TaskWorkerBrokerRecipient.
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/pione/notification/task-worker-broker-recipient.rb', line 11 def initialize(model, front_uri, listener_uri) super(front_uri, listener_uri) @model = model @tuple_space = {} @lock = Mutex.new # update broker's tuple spaces @thread = Thread.new do while true sleep 1 clean update_broker end end end |
Instance Method Details
#clean ⇒ Object
Clean tuple space table.
68 69 70 71 72 73 74 |
# File 'lib/pione/notification/task-worker-broker-recipient.rb', line 68 def clean @lock.synchronize do now = Time.now dtime = Global.tuple_space_disconnection_time @tuple_space.delete_if {|_, holder| (now - holder[:last_time]) > dtime} end end |
#get_tuple_space(uri) ⇒ Object
Get a tuple space from front server at the URI.
53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/pione/notification/task-worker-broker-recipient.rb', line 53 def get_tuple_space(uri) # build a reference to provider front front = DRb::DRbObject.new_with_uri(uri) # return the tuple space reference Timeout.timeout(3) {front.tuple_space} rescue Timeout::Error Log::Debug.notification do 'tuple_space notfication ignored the provider "%s" that seems to be something bad' % front.uri end rescue DRb::DRbConnError, DRbPatch::ReplyReaderError => e Log::Debug.notification('The tuple space at "%s" disconnected: %s' % [front.uri, e.]) end |
#receive_tuple_space(message) ⇒ Object
Receive a "tupele space" message.
35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/pione/notification/task-worker-broker-recipient.rb', line 35 def receive_tuple_space() uri = ["front"] if @tuple_space.has_key?(uri) @lock.synchronize {@tuple_space[uri][:last_time] = Time.now} else if tuple_space = get_tuple_space(uri) @lock.synchronize do @tuple_space[uri] = {:last_time => Time.now, :tuple_space => tuple_space} end end end end |
#terminate ⇒ Object
Terminate the recipient.
29 30 31 32 |
# File 'lib/pione/notification/task-worker-broker-recipient.rb', line 29 def terminate super @thread.terminate end |
#update_broker ⇒ Object
Update the tuple space list of broker.
77 78 79 80 81 82 |
# File 'lib/pione/notification/task-worker-broker-recipient.rb', line 77 def update_broker @lock.synchronize do tuple_spaces = @tuple_space.values.map {|holder| holder[:tuple_space]} @model.update_tuple_spaces(tuple_spaces) end end |