Class: Roby::Distributed::PeerServer

Inherits:
Object
  • Object
show all
Includes:
DRbUndumped
Defined in:
lib/roby/distributed/peer.rb,
lib/roby/distributed/transaction.rb,
lib/roby/distributed/subscription.rb,
lib/roby/distributed/communication.rb,
lib/roby/distributed/communication.rb,
lib/roby/distributed/notifications.rb,
lib/roby/distributed/notifications.rb,
lib/roby/distributed/distributed_object.rb,
lib/roby/distributed/notifications.rb,
lib/roby/distributed/notifications.rb

Overview

PeerServer objects are the objects which act as servers for the plan managers we are connected on, i.e. it will process the messages sent by those remote plan managers.

The client part, that is the part which actually send the messages, is a Peer object accessible through the Peer#peer attribute.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(peer) ⇒ PeerServer

Create a PeerServer object for the given peer



111
112
113
114
# File 'lib/roby/distributed/peer.rb', line 111

def initialize(peer)
    @peer	    = peer 
    @triggers	    = Hash.new
end

Instance Attribute Details

#current_message_idObject

The ID of the message we are currently processing



476
477
478
# File 'lib/roby/distributed/communication.rb', line 476

def current_message_id
  @current_message_id
end

#peerObject (readonly)

The Peer object we are associated to



105
106
107
# File 'lib/roby/distributed/peer.rb', line 105

def peer
  @peer
end

#triggersObject (readonly)

The set of triggers our peer has added to our plan



108
109
110
# File 'lib/roby/distributed/peer.rb', line 108

def triggers
  @triggers
end

Instance Method Details

#add_owner(object, owner) ⇒ Object

Message received when owner is a peer which now owns object



168
169
170
171
# File 'lib/roby/distributed/distributed_object.rb', line 168

def add_owner(object, owner)
		peer.local_object(object).add_owner(peer.local_object(owner), false)
		nil
end

#add_trigger(id, matcher) ⇒ Object

The peers asks to be notified if a plan object which matches matcher changes



150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/roby/distributed/peer.rb', line 150

def add_trigger(id, matcher)
    triggers[id] = [matcher, (triggered = ValueSet.new)]
    Roby.info "#{remote_name} wants notification on #{matcher} (#{id})"

    peer.queueing do
	matcher.each(plan) do |task|
	    if !triggered.include?(task)
		triggered << task
		peer.transmit(:triggered, id, task)
	    end
	end
    end
    nil
end

#added_sibling(local_id, remote_id) ⇒ Object

Called by the remote peer to announce that it has created the given siblings. siblings is a remote_drbobject => local_object hash

It is also used by BasicObject#sibling_of to register a new sibling



106
107
108
109
# File 'lib/roby/distributed/subscription.rb', line 106

def added_sibling(local_id, remote_id)
		local_id.local_object.add_sibling_for(peer, remote_id)
		nil
end

#completed(result, error, id) ⇒ Object

Message received when a given call, identified by its ID, has been processed on the remote peer. result is the value returned by the method, error an exception object (if an error occured).



505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
# File 'lib/roby/distributed/communication.rb', line 505

def completed(result, error, id)
		call_spec = peer.completion_queue.pop
		if call_spec.message_id != id
 result = Exception.exception("something fishy: ID mismatch in completion queue (#{call_spec.message_id} != #{id}")
 error  = true
 call_spec = nil
		end
		if error
 if call_spec && thread = call_spec.waiting_thread
			result = peer.local_object(result)
			thread.raise result
 else
			Roby::Distributed.fatal "fatal error in communication with #{peer}: #{result.full_message}"
			Roby::Distributed.fatal "disconnecting ..."
			if peer.connected?
  peer.disconnect
			else
  peer.disconnected!
			end
 end

		elsif call_spec
 peer.call_attached_block(call_spec, result)
		end

		nil
end

#completed!(result, error) ⇒ Object

Queue a completion message for our peer. This is usually done automatically in #demux, but it is useful to do it manually in certain conditions, for instance in PeerServer#execute

In #execute, the control thread -> RX thread context switch is not immediate. Therefore, it is possible that events are queued by the control thread while the #completed message is not. #completed! both queues the message and makes sure that #demux won’t.



542
543
544
545
546
547
548
549
550
# File 'lib/roby/distributed/communication.rb', line 542

def completed!(result, error)
		if queued_completion?
 raise "already queued the completed message"
		else
 Distributed.debug { "done, returns #{'error ' if error}#{result || 'nil'} in completed!" }
 self.queued_completion = true
 peer.queue_call false, :completed, [result, error, current_message_id]
		end
end

#completion_group(from_id, to_id) ⇒ Object

Message received to describe a group of consecutive calls that have been completed, when all those calls return nil. This is simply an optimization of the communication protocol, as most remote calls return nil.

from_id is the ID of the first call of the group and to_id the last. Both are included in the group.



494
495
496
497
498
499
# File 'lib/roby/distributed/communication.rb', line 494

def completion_group(from_id, to_id)
		for id in (from_id..to_id)
 completed(nil, nil, id)
		end
		nil
end

#create_sibling(marshalled_object) ⇒ Object

Message sent when our remote peer requests that we create a local representation for one of its objects. It therefore creates a sibling for marshalled_object, which is a representation of a distributed object present on our peer.

It calls #created_sibling on marshalled_object with the new created sibling, to allow for specific operations to be done on it.



155
156
157
158
159
160
161
162
163
164
165
# File 'lib/roby/distributed/distributed_object.rb', line 155

def create_sibling(marshalled_object)
		object_remote_id = peer.remote_object(marshalled_object)
		if sibling = peer.proxies[object_remote_id]
 raise ArgumentError, "#{marshalled_object} has already a sibling (#{sibling})"
		end

		sibling = marshalled_object.sibling(peer)
		peer.subscriptions << object_remote_id
		marshalled_object.created_sibling(peer, sibling)
		nil
end

#disconnectObject

Message received when our peer is closing the connection



409
410
411
412
# File 'lib/roby/distributed/communication.rb', line 409

def disconnect
		peer.disconnected
		nil
end

#discover_neighborhood(object, distance) ⇒ Object

Send the neighborhood of distance hops around object to the peer



179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# File 'lib/roby/distributed/peer.rb', line 179

def discover_neighborhood(object, distance)
    object = peer.local_object(object)
    edges = object.neighborhood(distance)
    if object.respond_to?(:each_plan_child)
	object.each_plan_child do |plan_child|
	    edges += plan_child.neighborhood(distance)
	end
    end

    # Replace the relation graphs by their name
    edges.delete_if do |rel, from, to, info|
	!(rel.distribute? && from.distribute? && to.distribute?)
    end
    edges
end

#discover_plan(marshalled_plan, m_tasks, m_events) ⇒ Object

Called by the remote host because it has subscribed us to a plan (a set of tasks and events).



91
92
93
94
95
96
97
98
# File 'lib/roby/distributed/subscription.rb', line 91

def discover_plan(marshalled_plan, m_tasks, m_events)
		plan = peer.local_object(marshalled_plan)
		Distributed.update(plan) do
 peer.local_object(m_tasks)
 peer.local_object(m_events)
		end
		nil
end

#done_synchro_pointObject

Message received when the synchro point is finished.



485
# File 'lib/roby/distributed/communication.rb', line 485

def done_synchro_point; end

#event_add_propagation(only_forward, marshalled_from, marshalled_to, event_id, time, context) ⇒ Object

Message received when the marshalled_from generator has either been forwarded (only_forward = true) or signals (only_forward = false) the marshalled_to generator. The remaining information describes the event itself.



444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
# File 'lib/roby/distributed/notifications.rb', line 444

def event_add_propagation(only_forward, marshalled_from, marshalled_to, event_id, time, context)
		from_generator  = peer.local_object(marshalled_from)
		to_generator	= peer.local_object(marshalled_to)
		context         = peer.local_object(context)

		event		= event_for(from_generator, event_id, time, context)

		# Only add the signalling if we own +to+
		if to_generator.self_owned?
 Propagation.add_event_propagation(only_forward, [event], to_generator, event.context, nil)
		else
 # Call #signalling or #forwarding to make
 # +from_generator+ look like as if the event was really
 # fired locally ...
 Distributed.update_all([from_generator.root_object, to_generator.root_object]) do
			if only_forward then from_generator.forwarding(event, to_generator)
			else from_generator.signalling(event, to_generator)
			end
 end
		end

		nil
end

#event_fired(marshalled_from, event_id, time, context) ⇒ Object

Message received when the marshalled_from generator fired an event, with the given event id, time and context.



426
427
428
429
430
431
432
433
434
435
436
437
438
# File 'lib/roby/distributed/notifications.rb', line 426

def event_fired(marshalled_from, event_id, time, context)
		from_generator = peer.local_object(marshalled_from)
		context	       = peer.local_object(context)

		event = event_for(from_generator, event_id, time, context)

		event.send(:propagation_id=, Propagation.propagation_id)
		from_generator.instance_variable_set("@happened", true)
		from_generator.fired(event)
		from_generator.call_handlers(event)

		nil
end

#event_for(generator, event_id, time, context) ⇒ Object

Creates an Event object for generator, with the given argument as parameters, or returns an already existing one



409
410
411
412
413
414
415
416
417
418
419
420
421
422
# File 'lib/roby/distributed/notifications.rb', line 409

def event_for(generator, event_id, time, context)
		id, event = pending_events[generator]
		if id && id == event_id
 return event
		end
		
		event = generator.new(context)
		event.send(:time=, time)
		if generator.respond_to?(:task)
 generator.task.update_task_status(event)
		end
		pending_events[generator] = [event_id, event]
		event
end

#executeObject

call-seq: execute { … }

Executes the given block in the control thread and return when the block has finished its execution. This method can be called only when serving a remote call.



558
559
560
561
562
563
564
565
566
567
568
569
570
571
# File 'lib/roby/distributed/communication.rb', line 558

def execute
		if !processing?
 return yield
		end

		Roby.execute do
 error = nil
 begin
			result = yield
 rescue Exception => error
 end
 completed!(error || result, !!error, peer.current_message_id)
		end
end

#fatal_error(error, msg, args) ⇒ Object

Message received when an error occured on the remote side, if this error cannot be recovered.



403
404
405
406
# File 'lib/roby/distributed/communication.rb', line 403

def fatal_error(error, msg, args)
		Distributed.fatal "remote reports #{peer.local_object(error)} while processing #{msg}(#{args.join(", ")})"
		disconnect
end

#local_nameObject

The name of the local ConnectionSpace object we are acting on



135
# File 'lib/roby/distributed/peer.rb', line 135

def local_name; peer.local_name end

#planObject

The plan object which is used as a facade for our peer



140
# File 'lib/roby/distributed/peer.rb', line 140

def plan; peer.connection_space.plan end

#plan_discover(plan, m_tasks, m_relations) ⇒ Object

Message received when the set of tasks m_tasks has been discovered by the remote plan. m_relations describes the internal relations between elements of m_tasks. It is in a format suitable for PeerServer#set_relations.



207
208
209
210
211
212
213
214
215
216
217
218
# File 'lib/roby/distributed/notifications.rb', line 207

def plan_discover(plan, m_tasks, m_relations)
		Distributed.update(plan = peer.local_object(plan)) do
 tasks = peer.local_object(m_tasks).to_value_set
 Distributed.update_all(tasks) do 
			plan.discover(tasks)
			m_relations.each_slice(2) do |obj, rel|
  set_relations(obj, rel)
			end
 end
		end
		nil
end

#plan_remove_object(plan, object) ⇒ Object

Message received when object has been removed from plan



239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
# File 'lib/roby/distributed/notifications.rb', line 239

def plan_remove_object(plan, object)
		if local = peer.local_object(object, false)
 # Beware, transaction proxies have no 'plan' attribute
 plan = peer.local_object(plan)
 Distributed.update(plan) do
			Distributed.update(local) do
  plan.remove_object(local)
			end
 end
 local.forget_peer(peer)
		end

rescue ArgumentError => e
		if e.message =~ /has not been included in this plan/
 Roby::Distributed.warn "filtering the 'not included in this plan bug'"
		else
 raise
		end
end

#plan_replace(plan, m_from, m_to) ⇒ Object

Message received when m_from has been replaced by m_to in the plan



222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
# File 'lib/roby/distributed/notifications.rb', line 222

def plan_replace(plan, m_from, m_to)
		Distributed.update(plan = peer.local_object(plan)) do
 from, to = peer.local_object(m_from), peer.local_object(m_to)

 Distributed.update_all([from, to]) { plan.replace(from, to) }

 # Subscribe to the new task if the old task was subscribed
 # +from+ will be unsubscribed when it is finalized
 if peer.subscribed?(from) && !peer.subscribed?(to)
			peer.subscribe(to)
			nil
 end
		end
		nil
end

#plan_set_mission(plan, task, flag) ⇒ Object

Message received when task has become a mission (flag = true), or has become a non-mission (flag = false) in plan.



188
189
190
191
192
193
194
195
196
197
198
199
200
201
# File 'lib/roby/distributed/notifications.rb', line 188

def plan_set_mission(plan, task, flag)
		plan = peer.local_object(plan)
		task = peer.local_object(task)
		if plan.owns?(task)
 if flag
			plan.insert(task)
 else
			plan.discard(task)
 end
		else
 task.mission = flag
		end
		nil
end

#prepare_remove_owner(object, owner) ⇒ Object

Message received before #remove_owner, to verify if the removal operation can be done or not.



179
180
181
182
183
184
# File 'lib/roby/distributed/distributed_object.rb', line 179

def prepare_remove_owner(object, owner)
		peer.local_object(object).prepare_remove_owner(peer.local_object(owner))
		nil
rescue
		$!
end

#query_result_set(query) ⇒ Object

Applies matcher on the local plan and sends back the result



143
144
145
146
# File 'lib/roby/distributed/peer.rb', line 143

def query_result_set(query)
    plan.query_result_set(peer.local_object(query)).
	delete_if { |obj| !obj.distribute? }
end

#remote_nameObject

The name of the remote peer



137
# File 'lib/roby/distributed/peer.rb', line 137

def remote_name; peer.remote_name end

#remove_owner(object, owner) ⇒ Object

Message received when owner does not own object anymore



173
174
175
176
# File 'lib/roby/distributed/distributed_object.rb', line 173

def remove_owner(object, owner)
		peer.local_object(object).remove_owner(peer.local_object(owner), false)
		nil
end

#remove_trigger(id) ⇒ Object

Remove the trigger id defined by this peer



166
167
168
169
170
# File 'lib/roby/distributed/peer.rb', line 166

def remove_trigger(id)
    Roby.info "#{remote_name} removed #{id} notification"
    triggers.delete(id)
    nil
end

#removed_sibling(local_id, remote_id) ⇒ Object

Called by the remote peer to announce that it has removed the given siblings. objects is the list of local objects.

It is also used by BasicObject#forget_peer to remove references to an old sibling



116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/roby/distributed/subscription.rb', line 116

def removed_sibling(local_id, remote_id)
		local_object = local_id.local_object
		sibling = local_object.remove_sibling_for(peer, remote_id)

		# It is fine to remove a sibling twice: you nay for instance
		# decide in both sides that the sibling should be removed (for
		# instance during the disconnection process)
		if sibling && sibling != remote_id
 raise "removed sibling #{sibling} for #{local_id} on peer #{peer} does not match the provided remote id (#{remote_id})"
		end

		unless local_object.remotely_useful?
 Distributed.removed_objects.delete(local_object)
		end
end

#set_relations(object, relations) ⇒ Object

Sets the relation of objects according to the description in relations. See #relations_of for how relations is formatted

Note that any relation not listed in relations will actually be removed from the plan. Therefore, if relations is empty, then all relations of object are removed.



161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
# File 'lib/roby/distributed/subscription.rb', line 161

def set_relations(object, relations)
		object    = peer.local_object(object)
		relations = peer.local_object(relations)

		Distributed.update(object.root_object) do
 all_parents  = Hash.new { |h, k| h[k] = ValueSet.new }
 all_children = Hash.new { |h, k| h[k] = ValueSet.new }
 
 # Add or update existing relations
 relations.each_slice(3) do |graph, parents, children|
			all_objects = parents.map { |p, _| p } + children.map { |c, _| c }
			Distributed.update_all(all_objects) do
  parents.each_slice(2) do |parent, info|
				next unless parent
				all_parents[graph] << parent

				if graph.linked?(parent, object)
   parent[object, graph] = info
				else
   Distributed.update(parent.root_object) do
parent.add_child_object(object, graph, info)
   end
				end
  end
  children.each_slice(2) do |child, info|
				next unless child
				all_children[graph] << child

				if graph.linked?(object, child)
   object[child, graph] = info
				else
   Distributed.update(child.root_object) do
object.add_child_object(child, graph, info)
   end
				end
  end
			end
 end

 Distributed.each_object_relation(object) do |rel|
			# Remove relations that do not exist anymore
			#
			# If the other end of this relation cannot be seen by
			# our remote peer, keep it: it means that the relation
			# is a local-only annotation this pDB has added to the
			# task
			(object.parent_objects(rel).to_value_set - all_parents[rel]).each do |p|
  # See comment above
  next unless p.distribute?
  Distributed.update_all([p.root_object, object.root_object]) do
				p.remove_child_object(object, rel)
  end
			end
			(object.child_objects(rel).to_value_set - all_children[rel]).each do |c|
  # See comment above
  next unless c.distribute?
  Distributed.update_all([c.root_object, object.root_object]) do
				object.remove_child_object(c, rel)
  end
			end
 end
		end

		nil
end

#set_relations_commands(plan_object) ⇒ Object

Sends to the peer the set of relations needed to copy the state of plan_object on the remote peer.



145
146
147
148
149
150
151
152
153
# File 'lib/roby/distributed/subscription.rb', line 145

def set_relations_commands(plan_object)
		peer.transmit(:set_relations, plan_object, Distributed.relations_of(plan_object))

		if plan_object.respond_to?(:each_plan_child)
 plan_object.each_plan_child do |plan_child|
			peer.transmit(:set_relations, plan_child, Distributed.relations_of(plan_child))
 end
		end
end

#state_update(new_state) ⇒ Object

Message received to update our view of the remote robot state.



525
526
527
528
# File 'lib/roby/distributed/notifications.rb', line 525

def state_update(new_state)
		peer.state = new_state
		nil
end

#subscribe(m_object) ⇒ Object

Subscribe the remote peer to changes on object. object must be an object owned locally.



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/roby/distributed/subscription.rb', line 55

def subscribe(m_object)
		if !(local_object = peer.local_object(m_object, false))
 raise OwnershipError, "no object for #{m_object}"
		elsif !local_object.self_owned?
 raise OwnershipError, "not owner of #{local_object}"
		end

		# We put the subscription process outside the communication
		# thread so that the remote peer can send back the siblings it
		# has created
		peer.queueing do
 peer.transmit(:subscribed, [local_object])

 case local_object
 when PlanObject
			if !local_object.root_object?
  raise ArgumentError, "cannot subscribe to non-root objects"
			end
			subscribe_plan_object(local_object)

 when Plan
			tasks, events = local_object.known_tasks, local_object.free_events
			tasks.delete_if    { |t| !t.distribute? }
			events.delete_if   { |t| !t.distribute? }

			peer.transmit(:discover_plan, local_object, tasks, events)
			tasks.each  { |obj| subscribe_plan_object(obj) }
			events.each { |obj| subscribe_plan_object(obj) }
 end
		end

		local_object.remote_id
end

#subscribe_plan(sibling) ⇒ Object

The peer wants to subscribe to our main plan



41
42
43
44
45
# File 'lib/roby/distributed/subscription.rb', line 41

def subscribe_plan(sibling)
		added_sibling(Roby.plan.remote_id, sibling)
		peer.transmit(:subscribed_plan, Roby.plan.remote_id)
		subscribe(Roby.plan)
end

#subscribe_plan_object(object) ⇒ Object

Called by the peer to subscribe on object. Returns an array which is to be fed to #demux to update the object relations on the remote host

In case of distributed transaction, it is forbidden to subscribe to a proxy without having subscribed to the proxied object first. This method will thus subscribe to both at the same time. Peer#subscribe is supposed to do the same



33
34
35
36
37
38
# File 'lib/roby/distributed/subscription.rb', line 33

def subscribe_plan_object(object)
		if Transactions::Proxy === object && object.__getobj__.self_owned?
 subscribe_plan_object(object.__getobj__)
		end
		set_relations_commands(object)
end

#subscribed(objects) ⇒ Object

Called by the remote peer to announce that is has subscribed us to objects



133
134
135
136
137
138
139
140
141
# File 'lib/roby/distributed/subscription.rb', line 133

def subscribed(objects)
		# Register the subscription
		objects.each do |object|
 peer.subscriptions << peer.remote_object(object)
		end
		# Create the proxies
		peer.local_object(objects)
		nil
end

#subscribed_plan(remote_plan_id) ⇒ Object

Called by our peer because it has subscribed us to its main plan



48
49
50
51
# File 'lib/roby/distributed/subscription.rb', line 48

def subscribed_plan(remote_plan_id)
		peer.proxies[remote_plan_id] = Roby.plan
		peer.remote_plan = remote_plan_id
end

#synchro_pointObject

Message received when the first half of a synchro point is reached. See Peer#synchro_point.



480
481
482
483
# File 'lib/roby/distributed/communication.rb', line 480

def synchro_point
		peer.transmit(:done_synchro_point)
		nil
end

#to_sObject

:nodoc:



116
117
118
# File 'lib/roby/distributed/peer.rb', line 116

def to_s # :nodoc:
    "PeerServer:#{remote_name}" 
end

#transaction_abandon_commit(trsc, error) ⇒ Object

Message received when a transaction commit is to be abandonned.



457
458
459
460
461
# File 'lib/roby/distributed/transaction.rb', line 457

def transaction_abandon_commit(trsc, error)
		trsc = peer.local_object(trsc)
		peer.connection_space.transaction_abandon_commit(trsc, error)
		nil
end

#transaction_commit(trsc) ⇒ Object

Message received when a transaction commit is requested.



451
452
453
454
455
# File 'lib/roby/distributed/transaction.rb', line 451

def transaction_commit(trsc)
		trsc = peer.local_object(trsc)
		peer.connection_space.transaction_commit(trsc)
		nil
end

#transaction_discard(trsc) ⇒ Object

Message received when a transaction discard is requested.



463
464
465
466
467
# File 'lib/roby/distributed/transaction.rb', line 463

def transaction_discard(trsc)
		trsc = peer.local_object(trsc)
		peer.connection_space.transaction_discard(trsc)
		nil
end

#transaction_give_token(trsc, needs_edition) ⇒ Object

Message received when the transaction edition token is given to this plan manager.



470
471
472
473
474
# File 'lib/roby/distributed/transaction.rb', line 470

def transaction_give_token(trsc, needs_edition)
		trsc = peer.local_object(trsc)
		trsc.edit!(needs_edition)
		nil
end

#transaction_prepare_commit(trsc) ⇒ Object

Message received when the ‘prepare’ stage of the transaction commit is requested.



444
445
446
447
448
449
# File 'lib/roby/distributed/transaction.rb', line 444

def transaction_prepare_commit(trsc)
		trsc = peer.local_object(trsc)
		peer.connection_space.transaction_prepare_commit(trsc)
		trsc.freezed!
		nil
end

#trigger(*objects) ⇒ Object

Activate any trigger that may exist on objects It sends the PeerServer#triggered message for each objects that are actually matching a registered trigger.



123
124
125
126
127
128
129
130
131
132
# File 'lib/roby/distributed/peer.rb', line 123

def trigger(*objects)
    triggers.each do |id, (matcher, triggered)|
	objects.each do |object|
	    if !triggered.include?(object) && matcher === object
		triggered << object
		peer.transmit(:triggered, id, object)
	    end
	end
    end
end

#triggered(id, task) ⇒ Object

Message received when task has matched the trigger referenced by id



173
174
175
176
# File 'lib/roby/distributed/peer.rb', line 173

def triggered(id, task)
    peer.triggered(id, task) 
    nil
end

#update_relation(plan, m_from, op, m_to, m_rel, m_info = nil) ⇒ Object

Message received when a relation graph has been updated. op is either add_child_object or remove_child_object and describes what relation modification should be done. The two plan objects m_from and m_to are respectively linked or unlinked in the relation m_rel, with the given information object in case of a new relation.



265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
# File 'lib/roby/distributed/notifications.rb', line 265

def update_relation(plan, m_from, op, m_to, m_rel, m_info = nil)
		if plan
 Roby::Distributed.update(peer.local_object(plan)) { update_relation(nil, m_from, op, m_to, m_rel, m_info) }
		else
 from, to = 
			peer.local_object(m_from, false), 
			peer.local_object(m_to, false)

 if !from
			return unless to && (to.self_owned? || to.subscribed?)
			from = peer.local_object(m_from)
 elsif !to
			return unless from && (from.self_owned? || from.subscribed?)
			to   = peer.local_object(m_to)
 end

 rel = peer.local_object(m_rel)
 Roby::Distributed.update_all([from.root_object, to.root_object]) do
			if op == :add_child_object
  from.add_child_object(to, rel, peer.local_object(m_info))
			elsif op == :remove_child_object
  from.remove_child_object(to, rel)
			end
 end
		end
		nil
end

#updated_arguments(task, arguments) ⇒ Object

Message received to announce that the arguments of task have been modified. arguments is a hash containing only the new values.



513
514
515
516
517
518
519
520
# File 'lib/roby/distributed/notifications.rb', line 513

def updated_arguments(task, arguments)
		proxy = peer.local_object(task)
		arguments = peer.proxy(arguments)
		Distributed.update(proxy) do
 proxy.arguments.merge!(arguments || {})
		end
		nil
end

#updated_data(task, data) ⇒ Object

Message received to announce that the internal data of task is now data.



504
505
506
507
508
# File 'lib/roby/distributed/notifications.rb', line 504

def updated_data(task, data)
		proxy = peer.local_object(task)
		proxy.instance_variable_set("@data", peer.proxy(data))
		nil
end