Class: Roby::Distributed::PeerServer
- Includes:
- DRbUndumped
- Defined in:
- lib/roby/distributed/peer.rb,
lib/roby/distributed/transaction.rb,
lib/roby/distributed/subscription.rb,
lib/roby/distributed/communication.rb,
lib/roby/distributed/communication.rb,
lib/roby/distributed/notifications.rb,
lib/roby/distributed/notifications.rb,
lib/roby/distributed/distributed_object.rb,
lib/roby/distributed/notifications.rb,
lib/roby/distributed/notifications.rb
Overview
PeerServer objects are the objects which act as servers for the plan managers we are connected on, i.e. it will process the messages sent by those remote plan managers.
The client part, that is the part which actually send the messages, is a Peer object accessible through the Peer#peer attribute.
Instance Attribute Summary collapse
-
#current_message_id ⇒ Object
The ID of the message we are currently processing.
-
#peer ⇒ Object
readonly
The Peer object we are associated to.
-
#triggers ⇒ Object
readonly
The set of triggers our peer has added to our plan.
Instance Method Summary collapse
-
#add_owner(object, owner) ⇒ Object
Message received when
owneris a peer which now ownsobject. -
#add_trigger(id, matcher) ⇒ Object
The peers asks to be notified if a plan object which matches
matcherchanges. -
#added_sibling(local_id, remote_id) ⇒ Object
Called by the remote peer to announce that it has created the given siblings.
-
#completed(result, error, id) ⇒ Object
Message received when a given call, identified by its ID, has been processed on the remote peer.
-
#completed!(result, error) ⇒ Object
Queue a completion message for our peer.
-
#completion_group(from_id, to_id) ⇒ Object
Message received to describe a group of consecutive calls that have been completed, when all those calls return nil.
-
#create_sibling(marshalled_object) ⇒ Object
Message sent when our remote peer requests that we create a local representation for one of its objects.
-
#disconnect ⇒ Object
Message received when our peer is closing the connection.
-
#discover_neighborhood(object, distance) ⇒ Object
Send the neighborhood of
distancehops aroundobjectto the peer. -
#discover_plan(marshalled_plan, m_tasks, m_events) ⇒ Object
Called by the remote host because it has subscribed us to a plan (a set of tasks and events).
-
#done_synchro_point ⇒ Object
Message received when the synchro point is finished.
-
#event_add_propagation(only_forward, marshalled_from, marshalled_to, event_id, time, context) ⇒ Object
Message received when the
marshalled_fromgenerator has either been forwarded (only_forward = true) or signals (only_forward = false) themarshalled_togenerator. -
#event_fired(marshalled_from, event_id, time, context) ⇒ Object
Message received when the
marshalled_fromgenerator fired an event, with the given event id, time and context. -
#event_for(generator, event_id, time, context) ⇒ Object
Creates an Event object for
generator, with the given argument as parameters, or returns an already existing one. -
#execute ⇒ Object
call-seq: execute { … }.
-
#fatal_error(error, msg, args) ⇒ Object
Message received when an error occured on the remote side, if this error cannot be recovered.
-
#initialize(peer) ⇒ PeerServer
constructor
Create a PeerServer object for the given peer.
-
#local_name ⇒ Object
The name of the local ConnectionSpace object we are acting on.
-
#plan ⇒ Object
The plan object which is used as a facade for our peer.
-
#plan_discover(plan, m_tasks, m_relations) ⇒ Object
Message received when the set of tasks
m_taskshas been discovered by the remote plan. -
#plan_remove_object(plan, object) ⇒ Object
Message received when
objecthas been removed fromplan. -
#plan_replace(plan, m_from, m_to) ⇒ Object
Message received when
m_fromhas been replaced bym_toin the plan. -
#plan_set_mission(plan, task, flag) ⇒ Object
Message received when
taskhas become a mission (flag = true), or has become a non-mission (flag = false) inplan. -
#prepare_remove_owner(object, owner) ⇒ Object
Message received before #remove_owner, to verify if the removal operation can be done or not.
-
#query_result_set(query) ⇒ Object
Applies
matcheron the local plan and sends back the result. -
#remote_name ⇒ Object
The name of the remote peer.
-
#remove_owner(object, owner) ⇒ Object
Message received when
ownerdoes not ownobjectanymore. -
#remove_trigger(id) ⇒ Object
Remove the trigger
iddefined by this peer. -
#removed_sibling(local_id, remote_id) ⇒ Object
Called by the remote peer to announce that it has removed the given siblings.
-
#set_relations(object, relations) ⇒ Object
Sets the relation of
objectsaccording to the description inrelations. -
#set_relations_commands(plan_object) ⇒ Object
Sends to the peer the set of relations needed to copy the state of
plan_objecton the remote peer. -
#state_update(new_state) ⇒ Object
Message received to update our view of the remote robot state.
-
#subscribe(m_object) ⇒ Object
Subscribe the remote peer to changes on
object. -
#subscribe_plan(sibling) ⇒ Object
The peer wants to subscribe to our main plan.
-
#subscribe_plan_object(object) ⇒ Object
Called by the peer to subscribe on
object. -
#subscribed(objects) ⇒ Object
Called by the remote peer to announce that is has subscribed us to
objects. -
#subscribed_plan(remote_plan_id) ⇒ Object
Called by our peer because it has subscribed us to its main plan.
-
#synchro_point ⇒ Object
Message received when the first half of a synchro point is reached.
-
#to_s ⇒ Object
:nodoc:.
-
#transaction_abandon_commit(trsc, error) ⇒ Object
Message received when a transaction commit is to be abandonned.
-
#transaction_commit(trsc) ⇒ Object
Message received when a transaction commit is requested.
-
#transaction_discard(trsc) ⇒ Object
Message received when a transaction discard is requested.
-
#transaction_give_token(trsc, needs_edition) ⇒ Object
Message received when the transaction edition token is given to this plan manager.
-
#transaction_prepare_commit(trsc) ⇒ Object
Message received when the ‘prepare’ stage of the transaction commit is requested.
-
#trigger(*objects) ⇒ Object
Activate any trigger that may exist on
objectsIt sends the PeerServer#triggered message for each objects that are actually matching a registered trigger. -
#triggered(id, task) ⇒ Object
Message received when
taskhas matched the trigger referenced byid. -
#update_relation(plan, m_from, op, m_to, m_rel, m_info = nil) ⇒ Object
Message received when a relation graph has been updated.
-
#updated_arguments(task, arguments) ⇒ Object
Message received to announce that the arguments of
taskhave been modified. -
#updated_data(task, data) ⇒ Object
Message received to announce that the internal data of
taskis nowdata.
Constructor Details
#initialize(peer) ⇒ PeerServer
Create a PeerServer object for the given peer
111 112 113 114 |
# File 'lib/roby/distributed/peer.rb', line 111 def initialize(peer) @peer = peer @triggers = Hash.new end |
Instance Attribute Details
#current_message_id ⇒ Object
The ID of the message we are currently processing
476 477 478 |
# File 'lib/roby/distributed/communication.rb', line 476 def @current_message_id end |
#peer ⇒ Object (readonly)
The Peer object we are associated to
105 106 107 |
# File 'lib/roby/distributed/peer.rb', line 105 def peer @peer end |
#triggers ⇒ Object (readonly)
The set of triggers our peer has added to our plan
108 109 110 |
# File 'lib/roby/distributed/peer.rb', line 108 def triggers @triggers end |
Instance Method Details
#add_owner(object, owner) ⇒ Object
Message received when owner is a peer which now owns object
168 169 170 171 |
# File 'lib/roby/distributed/distributed_object.rb', line 168 def add_owner(object, owner) peer.local_object(object).add_owner(peer.local_object(owner), false) nil end |
#add_trigger(id, matcher) ⇒ Object
The peers asks to be notified if a plan object which matches matcher changes
150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
# File 'lib/roby/distributed/peer.rb', line 150 def add_trigger(id, matcher) triggers[id] = [matcher, (triggered = ValueSet.new)] Roby.info "#{remote_name} wants notification on #{matcher} (#{id})" peer.queueing do matcher.each(plan) do |task| if !triggered.include?(task) triggered << task peer.transmit(:triggered, id, task) end end end nil end |
#added_sibling(local_id, remote_id) ⇒ Object
Called by the remote peer to announce that it has created the given siblings. siblings is a remote_drbobject => local_object hash
It is also used by BasicObject#sibling_of to register a new sibling
106 107 108 109 |
# File 'lib/roby/distributed/subscription.rb', line 106 def added_sibling(local_id, remote_id) local_id.local_object.add_sibling_for(peer, remote_id) nil end |
#completed(result, error, id) ⇒ Object
Message received when a given call, identified by its ID, has been processed on the remote peer. result is the value returned by the method, error an exception object (if an error occured).
505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 |
# File 'lib/roby/distributed/communication.rb', line 505 def completed(result, error, id) call_spec = peer.completion_queue.pop if call_spec. != id result = Exception.exception("something fishy: ID mismatch in completion queue (#{call_spec.} != #{id}") error = true call_spec = nil end if error if call_spec && thread = call_spec.waiting_thread result = peer.local_object(result) thread.raise result else Roby::Distributed.fatal "fatal error in communication with #{peer}: #{result.}" Roby::Distributed.fatal "disconnecting ..." if peer.connected? peer.disconnect else peer.disconnected! end end elsif call_spec peer.call_attached_block(call_spec, result) end nil end |
#completed!(result, error) ⇒ Object
Queue a completion message for our peer. This is usually done automatically in #demux, but it is useful to do it manually in certain conditions, for instance in PeerServer#execute
In #execute, the control thread -> RX thread context switch is not immediate. Therefore, it is possible that events are queued by the control thread while the #completed message is not. #completed! both queues the message and makes sure that #demux won’t.
542 543 544 545 546 547 548 549 550 |
# File 'lib/roby/distributed/communication.rb', line 542 def completed!(result, error) if queued_completion? raise "already queued the completed message" else Distributed.debug { "done, returns #{'error ' if error}#{result || 'nil'} in completed!" } self.queued_completion = true peer.queue_call false, :completed, [result, error, ] end end |
#completion_group(from_id, to_id) ⇒ Object
Message received to describe a group of consecutive calls that have been completed, when all those calls return nil. This is simply an optimization of the communication protocol, as most remote calls return nil.
from_id is the ID of the first call of the group and to_id the last. Both are included in the group.
494 495 496 497 498 499 |
# File 'lib/roby/distributed/communication.rb', line 494 def completion_group(from_id, to_id) for id in (from_id..to_id) completed(nil, nil, id) end nil end |
#create_sibling(marshalled_object) ⇒ Object
Message sent when our remote peer requests that we create a local representation for one of its objects. It therefore creates a sibling for marshalled_object, which is a representation of a distributed object present on our peer.
It calls #created_sibling on marshalled_object with the new created sibling, to allow for specific operations to be done on it.
155 156 157 158 159 160 161 162 163 164 165 |
# File 'lib/roby/distributed/distributed_object.rb', line 155 def create_sibling(marshalled_object) object_remote_id = peer.remote_object(marshalled_object) if sibling = peer.proxies[object_remote_id] raise ArgumentError, "#{marshalled_object} has already a sibling (#{sibling})" end sibling = marshalled_object.sibling(peer) peer.subscriptions << object_remote_id marshalled_object.created_sibling(peer, sibling) nil end |
#disconnect ⇒ Object
Message received when our peer is closing the connection
409 410 411 412 |
# File 'lib/roby/distributed/communication.rb', line 409 def disconnect peer.disconnected nil end |
#discover_neighborhood(object, distance) ⇒ Object
Send the neighborhood of distance hops around object to the peer
179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 |
# File 'lib/roby/distributed/peer.rb', line 179 def discover_neighborhood(object, distance) object = peer.local_object(object) edges = object.neighborhood(distance) if object.respond_to?(:each_plan_child) object.each_plan_child do |plan_child| edges += plan_child.neighborhood(distance) end end # Replace the relation graphs by their name edges.delete_if do |rel, from, to, info| !(rel.distribute? && from.distribute? && to.distribute?) end edges end |
#discover_plan(marshalled_plan, m_tasks, m_events) ⇒ Object
Called by the remote host because it has subscribed us to a plan (a set of tasks and events).
91 92 93 94 95 96 97 98 |
# File 'lib/roby/distributed/subscription.rb', line 91 def discover_plan(marshalled_plan, m_tasks, m_events) plan = peer.local_object(marshalled_plan) Distributed.update(plan) do peer.local_object(m_tasks) peer.local_object(m_events) end nil end |
#done_synchro_point ⇒ Object
Message received when the synchro point is finished.
485 |
# File 'lib/roby/distributed/communication.rb', line 485 def done_synchro_point; end |
#event_add_propagation(only_forward, marshalled_from, marshalled_to, event_id, time, context) ⇒ Object
Message received when the marshalled_from generator has either been forwarded (only_forward = true) or signals (only_forward = false) the marshalled_to generator. The remaining information describes the event itself.
444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 |
# File 'lib/roby/distributed/notifications.rb', line 444 def event_add_propagation(only_forward, marshalled_from, marshalled_to, event_id, time, context) from_generator = peer.local_object(marshalled_from) to_generator = peer.local_object(marshalled_to) context = peer.local_object(context) event = event_for(from_generator, event_id, time, context) # Only add the signalling if we own +to+ if to_generator.self_owned? Propagation.add_event_propagation(only_forward, [event], to_generator, event.context, nil) else # Call #signalling or #forwarding to make # +from_generator+ look like as if the event was really # fired locally ... Distributed.update_all([from_generator.root_object, to_generator.root_object]) do if only_forward then from_generator.forwarding(event, to_generator) else from_generator.signalling(event, to_generator) end end end nil end |
#event_fired(marshalled_from, event_id, time, context) ⇒ Object
Message received when the marshalled_from generator fired an event, with the given event id, time and context.
426 427 428 429 430 431 432 433 434 435 436 437 438 |
# File 'lib/roby/distributed/notifications.rb', line 426 def event_fired(marshalled_from, event_id, time, context) from_generator = peer.local_object(marshalled_from) context = peer.local_object(context) event = event_for(from_generator, event_id, time, context) event.send(:propagation_id=, Propagation.propagation_id) from_generator.instance_variable_set("@happened", true) from_generator.fired(event) from_generator.call_handlers(event) nil end |
#event_for(generator, event_id, time, context) ⇒ Object
Creates an Event object for generator, with the given argument as parameters, or returns an already existing one
409 410 411 412 413 414 415 416 417 418 419 420 421 422 |
# File 'lib/roby/distributed/notifications.rb', line 409 def event_for(generator, event_id, time, context) id, event = pending_events[generator] if id && id == event_id return event end event = generator.new(context) event.send(:time=, time) if generator.respond_to?(:task) generator.task.update_task_status(event) end pending_events[generator] = [event_id, event] event end |
#execute ⇒ Object
call-seq: execute { … }
Executes the given block in the control thread and return when the block has finished its execution. This method can be called only when serving a remote call.
558 559 560 561 562 563 564 565 566 567 568 569 570 571 |
# File 'lib/roby/distributed/communication.rb', line 558 def execute if !processing? return yield end Roby.execute do error = nil begin result = yield rescue Exception => error end completed!(error || result, !!error, peer.) end end |
#fatal_error(error, msg, args) ⇒ Object
Message received when an error occured on the remote side, if this error cannot be recovered.
403 404 405 406 |
# File 'lib/roby/distributed/communication.rb', line 403 def fatal_error(error, msg, args) Distributed.fatal "remote reports #{peer.local_object(error)} while processing #{msg}(#{args.join(", ")})" disconnect end |
#local_name ⇒ Object
The name of the local ConnectionSpace object we are acting on
135 |
# File 'lib/roby/distributed/peer.rb', line 135 def local_name; peer.local_name end |
#plan ⇒ Object
The plan object which is used as a facade for our peer
140 |
# File 'lib/roby/distributed/peer.rb', line 140 def plan; peer.connection_space.plan end |
#plan_discover(plan, m_tasks, m_relations) ⇒ Object
Message received when the set of tasks m_tasks has been discovered by the remote plan. m_relations describes the internal relations between elements of m_tasks. It is in a format suitable for PeerServer#set_relations.
207 208 209 210 211 212 213 214 215 216 217 218 |
# File 'lib/roby/distributed/notifications.rb', line 207 def plan_discover(plan, m_tasks, m_relations) Distributed.update(plan = peer.local_object(plan)) do tasks = peer.local_object(m_tasks).to_value_set Distributed.update_all(tasks) do plan.discover(tasks) m_relations.each_slice(2) do |obj, rel| set_relations(obj, rel) end end end nil end |
#plan_remove_object(plan, object) ⇒ Object
Message received when object has been removed from plan
239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 |
# File 'lib/roby/distributed/notifications.rb', line 239 def plan_remove_object(plan, object) if local = peer.local_object(object, false) # Beware, transaction proxies have no 'plan' attribute plan = peer.local_object(plan) Distributed.update(plan) do Distributed.update(local) do plan.remove_object(local) end end local.forget_peer(peer) end rescue ArgumentError => e if e. =~ /has not been included in this plan/ Roby::Distributed.warn "filtering the 'not included in this plan bug'" else raise end end |
#plan_replace(plan, m_from, m_to) ⇒ Object
Message received when m_from has been replaced by m_to in the plan
222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 |
# File 'lib/roby/distributed/notifications.rb', line 222 def plan_replace(plan, m_from, m_to) Distributed.update(plan = peer.local_object(plan)) do from, to = peer.local_object(m_from), peer.local_object(m_to) Distributed.update_all([from, to]) { plan.replace(from, to) } # Subscribe to the new task if the old task was subscribed # +from+ will be unsubscribed when it is finalized if peer.subscribed?(from) && !peer.subscribed?(to) peer.subscribe(to) nil end end nil end |
#plan_set_mission(plan, task, flag) ⇒ Object
Message received when task has become a mission (flag = true), or has become a non-mission (flag = false) in plan.
188 189 190 191 192 193 194 195 196 197 198 199 200 201 |
# File 'lib/roby/distributed/notifications.rb', line 188 def plan_set_mission(plan, task, flag) plan = peer.local_object(plan) task = peer.local_object(task) if plan.owns?(task) if flag plan.insert(task) else plan.discard(task) end else task.mission = flag end nil end |
#prepare_remove_owner(object, owner) ⇒ Object
Message received before #remove_owner, to verify if the removal operation can be done or not.
179 180 181 182 183 184 |
# File 'lib/roby/distributed/distributed_object.rb', line 179 def prepare_remove_owner(object, owner) peer.local_object(object).prepare_remove_owner(peer.local_object(owner)) nil rescue $! end |
#query_result_set(query) ⇒ Object
Applies matcher on the local plan and sends back the result
143 144 145 146 |
# File 'lib/roby/distributed/peer.rb', line 143 def query_result_set(query) plan.query_result_set(peer.local_object(query)). delete_if { |obj| !obj.distribute? } end |
#remote_name ⇒ Object
The name of the remote peer
137 |
# File 'lib/roby/distributed/peer.rb', line 137 def remote_name; peer.remote_name end |
#remove_owner(object, owner) ⇒ Object
Message received when owner does not own object anymore
173 174 175 176 |
# File 'lib/roby/distributed/distributed_object.rb', line 173 def remove_owner(object, owner) peer.local_object(object).remove_owner(peer.local_object(owner), false) nil end |
#remove_trigger(id) ⇒ Object
Remove the trigger id defined by this peer
166 167 168 169 170 |
# File 'lib/roby/distributed/peer.rb', line 166 def remove_trigger(id) Roby.info "#{remote_name} removed #{id} notification" triggers.delete(id) nil end |
#removed_sibling(local_id, remote_id) ⇒ Object
Called by the remote peer to announce that it has removed the given siblings. objects is the list of local objects.
It is also used by BasicObject#forget_peer to remove references to an old sibling
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/roby/distributed/subscription.rb', line 116 def removed_sibling(local_id, remote_id) local_object = local_id.local_object sibling = local_object.remove_sibling_for(peer, remote_id) # It is fine to remove a sibling twice: you nay for instance # decide in both sides that the sibling should be removed (for # instance during the disconnection process) if sibling && sibling != remote_id raise "removed sibling #{sibling} for #{local_id} on peer #{peer} does not match the provided remote id (#{remote_id})" end unless local_object.remotely_useful? Distributed.removed_objects.delete(local_object) end end |
#set_relations(object, relations) ⇒ Object
Sets the relation of objects according to the description in relations. See #relations_of for how relations is formatted
Note that any relation not listed in relations will actually be removed from the plan. Therefore, if relations is empty, then all relations of object are removed.
161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 |
# File 'lib/roby/distributed/subscription.rb', line 161 def set_relations(object, relations) object = peer.local_object(object) relations = peer.local_object(relations) Distributed.update(object.root_object) do all_parents = Hash.new { |h, k| h[k] = ValueSet.new } all_children = Hash.new { |h, k| h[k] = ValueSet.new } # Add or update existing relations relations.each_slice(3) do |graph, parents, children| all_objects = parents.map { |p, _| p } + children.map { |c, _| c } Distributed.update_all(all_objects) do parents.each_slice(2) do |parent, info| next unless parent all_parents[graph] << parent if graph.linked?(parent, object) parent[object, graph] = info else Distributed.update(parent.root_object) do parent.add_child_object(object, graph, info) end end end children.each_slice(2) do |child, info| next unless child all_children[graph] << child if graph.linked?(object, child) object[child, graph] = info else Distributed.update(child.root_object) do object.add_child_object(child, graph, info) end end end end end Distributed.each_object_relation(object) do |rel| # Remove relations that do not exist anymore # # If the other end of this relation cannot be seen by # our remote peer, keep it: it means that the relation # is a local-only annotation this pDB has added to the # task (object.parent_objects(rel).to_value_set - all_parents[rel]).each do |p| # See comment above next unless p.distribute? Distributed.update_all([p.root_object, object.root_object]) do p.remove_child_object(object, rel) end end (object.child_objects(rel).to_value_set - all_children[rel]).each do |c| # See comment above next unless c.distribute? Distributed.update_all([c.root_object, object.root_object]) do object.remove_child_object(c, rel) end end end end nil end |
#set_relations_commands(plan_object) ⇒ Object
Sends to the peer the set of relations needed to copy the state of plan_object on the remote peer.
145 146 147 148 149 150 151 152 153 |
# File 'lib/roby/distributed/subscription.rb', line 145 def set_relations_commands(plan_object) peer.transmit(:set_relations, plan_object, Distributed.relations_of(plan_object)) if plan_object.respond_to?(:each_plan_child) plan_object.each_plan_child do |plan_child| peer.transmit(:set_relations, plan_child, Distributed.relations_of(plan_child)) end end end |
#state_update(new_state) ⇒ Object
Message received to update our view of the remote robot state.
525 526 527 528 |
# File 'lib/roby/distributed/notifications.rb', line 525 def state_update(new_state) peer.state = new_state nil end |
#subscribe(m_object) ⇒ Object
Subscribe the remote peer to changes on object. object must be an object owned locally.
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/roby/distributed/subscription.rb', line 55 def subscribe(m_object) if !(local_object = peer.local_object(m_object, false)) raise OwnershipError, "no object for #{m_object}" elsif !local_object.self_owned? raise OwnershipError, "not owner of #{local_object}" end # We put the subscription process outside the communication # thread so that the remote peer can send back the siblings it # has created peer.queueing do peer.transmit(:subscribed, [local_object]) case local_object when PlanObject if !local_object.root_object? raise ArgumentError, "cannot subscribe to non-root objects" end subscribe_plan_object(local_object) when Plan tasks, events = local_object.known_tasks, local_object.free_events tasks.delete_if { |t| !t.distribute? } events.delete_if { |t| !t.distribute? } peer.transmit(:discover_plan, local_object, tasks, events) tasks.each { |obj| subscribe_plan_object(obj) } events.each { |obj| subscribe_plan_object(obj) } end end local_object.remote_id end |
#subscribe_plan(sibling) ⇒ Object
The peer wants to subscribe to our main plan
41 42 43 44 45 |
# File 'lib/roby/distributed/subscription.rb', line 41 def subscribe_plan(sibling) added_sibling(Roby.plan.remote_id, sibling) peer.transmit(:subscribed_plan, Roby.plan.remote_id) subscribe(Roby.plan) end |
#subscribe_plan_object(object) ⇒ Object
Called by the peer to subscribe on object. Returns an array which is to be fed to #demux to update the object relations on the remote host
In case of distributed transaction, it is forbidden to subscribe to a proxy without having subscribed to the proxied object first. This method will thus subscribe to both at the same time. Peer#subscribe is supposed to do the same
33 34 35 36 37 38 |
# File 'lib/roby/distributed/subscription.rb', line 33 def subscribe_plan_object(object) if Transactions::Proxy === object && object.__getobj__.self_owned? subscribe_plan_object(object.__getobj__) end set_relations_commands(object) end |
#subscribed(objects) ⇒ Object
Called by the remote peer to announce that is has subscribed us to objects
133 134 135 136 137 138 139 140 141 |
# File 'lib/roby/distributed/subscription.rb', line 133 def subscribed(objects) # Register the subscription objects.each do |object| peer.subscriptions << peer.remote_object(object) end # Create the proxies peer.local_object(objects) nil end |
#subscribed_plan(remote_plan_id) ⇒ Object
Called by our peer because it has subscribed us to its main plan
48 49 50 51 |
# File 'lib/roby/distributed/subscription.rb', line 48 def subscribed_plan(remote_plan_id) peer.proxies[remote_plan_id] = Roby.plan peer.remote_plan = remote_plan_id end |
#synchro_point ⇒ Object
Message received when the first half of a synchro point is reached. See Peer#synchro_point.
480 481 482 483 |
# File 'lib/roby/distributed/communication.rb', line 480 def synchro_point peer.transmit(:done_synchro_point) nil end |
#to_s ⇒ Object
:nodoc:
116 117 118 |
# File 'lib/roby/distributed/peer.rb', line 116 def to_s # :nodoc: "PeerServer:#{remote_name}" end |
#transaction_abandon_commit(trsc, error) ⇒ Object
Message received when a transaction commit is to be abandonned.
457 458 459 460 461 |
# File 'lib/roby/distributed/transaction.rb', line 457 def transaction_abandon_commit(trsc, error) trsc = peer.local_object(trsc) peer.connection_space.transaction_abandon_commit(trsc, error) nil end |
#transaction_commit(trsc) ⇒ Object
Message received when a transaction commit is requested.
451 452 453 454 455 |
# File 'lib/roby/distributed/transaction.rb', line 451 def transaction_commit(trsc) trsc = peer.local_object(trsc) peer.connection_space.transaction_commit(trsc) nil end |
#transaction_discard(trsc) ⇒ Object
Message received when a transaction discard is requested.
463 464 465 466 467 |
# File 'lib/roby/distributed/transaction.rb', line 463 def transaction_discard(trsc) trsc = peer.local_object(trsc) peer.connection_space.transaction_discard(trsc) nil end |
#transaction_give_token(trsc, needs_edition) ⇒ Object
Message received when the transaction edition token is given to this plan manager.
470 471 472 473 474 |
# File 'lib/roby/distributed/transaction.rb', line 470 def transaction_give_token(trsc, needs_edition) trsc = peer.local_object(trsc) trsc.edit!(needs_edition) nil end |
#transaction_prepare_commit(trsc) ⇒ Object
Message received when the ‘prepare’ stage of the transaction commit is requested.
444 445 446 447 448 449 |
# File 'lib/roby/distributed/transaction.rb', line 444 def transaction_prepare_commit(trsc) trsc = peer.local_object(trsc) peer.connection_space.transaction_prepare_commit(trsc) trsc.freezed! nil end |
#trigger(*objects) ⇒ Object
Activate any trigger that may exist on objects It sends the PeerServer#triggered message for each objects that are actually matching a registered trigger.
123 124 125 126 127 128 129 130 131 132 |
# File 'lib/roby/distributed/peer.rb', line 123 def trigger(*objects) triggers.each do |id, (matcher, triggered)| objects.each do |object| if !triggered.include?(object) && matcher === object triggered << object peer.transmit(:triggered, id, object) end end end end |
#triggered(id, task) ⇒ Object
Message received when task has matched the trigger referenced by id
173 174 175 176 |
# File 'lib/roby/distributed/peer.rb', line 173 def triggered(id, task) peer.triggered(id, task) nil end |
#update_relation(plan, m_from, op, m_to, m_rel, m_info = nil) ⇒ Object
Message received when a relation graph has been updated. op is either add_child_object or remove_child_object and describes what relation modification should be done. The two plan objects m_from and m_to are respectively linked or unlinked in the relation m_rel, with the given information object in case of a new relation.
265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 |
# File 'lib/roby/distributed/notifications.rb', line 265 def update_relation(plan, m_from, op, m_to, m_rel, m_info = nil) if plan Roby::Distributed.update(peer.local_object(plan)) { update_relation(nil, m_from, op, m_to, m_rel, m_info) } else from, to = peer.local_object(m_from, false), peer.local_object(m_to, false) if !from return unless to && (to.self_owned? || to.subscribed?) from = peer.local_object(m_from) elsif !to return unless from && (from.self_owned? || from.subscribed?) to = peer.local_object(m_to) end rel = peer.local_object(m_rel) Roby::Distributed.update_all([from.root_object, to.root_object]) do if op == :add_child_object from.add_child_object(to, rel, peer.local_object(m_info)) elsif op == :remove_child_object from.remove_child_object(to, rel) end end end nil end |
#updated_arguments(task, arguments) ⇒ Object
Message received to announce that the arguments of task have been modified. arguments is a hash containing only the new values.
513 514 515 516 517 518 519 520 |
# File 'lib/roby/distributed/notifications.rb', line 513 def updated_arguments(task, arguments) proxy = peer.local_object(task) arguments = peer.proxy(arguments) Distributed.update(proxy) do proxy.arguments.merge!(arguments || {}) end nil end |
#updated_data(task, data) ⇒ Object
Message received to announce that the internal data of task is now data.
504 505 506 507 508 |
# File 'lib/roby/distributed/notifications.rb', line 504 def updated_data(task, data) proxy = peer.local_object(task) proxy.instance_variable_set("@data", peer.proxy(data)) nil end |