Class: Roby::Distributed::Peer

Inherits:
Object
  • Object
show all
Includes:
DRbUndumped
Defined in:
lib/roby/distributed/peer.rb,
lib/roby/distributed/protocol.rb,
lib/roby/distributed/transaction.rb,
lib/roby/distributed/subscription.rb,
lib/roby/distributed/communication.rb,
lib/roby/distributed/communication.rb,
lib/roby/distributed/distributed_object.rb

Overview

A Peer object is the client part of a connection with a remote plan manager. The server part, i.e. the object which actually receives requests from the remote plan manager, is the PeerServer object accessible through the Peer#local_server attribute.

Connection procedure

Connections are initiated When the user calls Peer.initiate_connection. The following protocol is then followed:

local

if the neighbour is already connected to us, we do nothing and yield the already existing peer. End.

local

check if we are already connecting to the peer. If it is the case, wait for the end of the connection thread.

local

otherwise, open a new socket and send the connect() message in it The connection thread is registered in ConnectionSpace.pending_connections

remote

check if we are already connecting to the peer (check ConnectionSpace.pending_connections)

  • if it is the case, the lowest token wins

  • if ‘remote’ wins, return :already_connecting

  • if ‘local’ wins, return :connected with the relevant information

Communication

Communication is done in two threads. The sending thread gets the calls from Peer#send_queue, formats them and sends them to the PeerServer#demux for processing. The reception thread is managed by dRb and its entry point is always #demux.

Very often we need to have processing on both sides to finish an operation. For instance, the creation of two siblings need to register the siblings on both sides. To manage that, it is possible for PeerServer methods which are serving a remote request to queue callbacks. These callbacks will be processed by Peer#send_thread before the rest of the queue might be processed

Defined Under Namespace

Classes: ComStats, ConnectionToken, DRoby

Constant Summary collapse

CYCLE_END_CALLS =

This set of calls mark the end of a cycle. When one of these is encountered, the calls gathered in #current_cycle are moved into #send_queue

[:connect, :disconnect, :fatal_error, :state_update]
@@message_id =
0

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection_space, socket, remote_name, remote_id, remote_state, &block) ⇒ Peer

Creates a Peer object for the peer connected at socket. This peer is to be managed by connection_space If a block is given, it is called in the control thread when the connection is finalized



282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
# File 'lib/roby/distributed/peer.rb', line 282

def initialize(connection_space, socket, remote_name, remote_id, remote_state, &block)
    # Initialize the remote name with the socket parameters. It will be set to 
    # the real name during the connection process
    @remote_name = remote_name
    @remote_id   = remote_id
    @peer_info   = socket.peer_info

    super() if defined? super

    @connection_space = connection_space
    @local_server = PeerServer.new(self)
    @proxies	  = Hash.new
    @removing_proxies = Hash.new { |h, k| h[k] = Array.new }
    @mutex	  = Mutex.new
    @triggers     = Hash.new
    @socket       = socket
    @stats        = ComStats.new 0, 0
    @dead	  = false
    @disabled_rx  = 0
    @disabled_tx  = 0
    connection_space.pending_sockets << [socket, self]

    @connection_state = :connected
    @send_queue       = Queue.new
    @completion_queue = Queue.new
    @current_cycle    = Array.new

    Roby::Distributed.peers[remote_id]   = self
    local_server.state_update remote_state

    @task = ConnectionTask.new :peer => self
    Roby::Control.once do
	connection_space.plan.permanent(task)
	task.start!
	task.emit(:ready)
    end

    @send_thread      = Thread.new(&method(:communication_loop))
end

Instance Attribute Details

#completion_queueObject (readonly)

The queue of calls that have been sent to our peer, but for which a completed message has not been received. This is a queue of CallSpec objects



588
589
590
# File 'lib/roby/distributed/communication.rb', line 588

def completion_queue
  @completion_queue
end

#connection_spaceObject (readonly)

The local ConnectionSpace object we act on



237
238
239
# File 'lib/roby/distributed/peer.rb', line 237

def connection_space
  @connection_space
end

#connection_stateObject (readonly)

A value indicating the current status of the connection. It can be one of :connected, :disconnecting, :disconnected



45
46
47
# File 'lib/roby/distributed/communication.rb', line 45

def connection_state
  @connection_state
end

#current_cycleObject (readonly)

The cycle data which is being gathered before queueing it into #send_queue



590
591
592
# File 'lib/roby/distributed/communication.rb', line 590

def current_cycle
  @current_cycle
end

#local_serverObject (readonly)

The local PeerServer object for this peer



239
240
241
# File 'lib/roby/distributed/peer.rb', line 239

def local_server
  @local_server
end

#mutexObject (readonly)

The main synchronization mutex to access the peer. See also Peer#synchronize



577
578
579
# File 'lib/roby/distributed/communication.rb', line 577

def mutex
  @mutex
end

#nameObject (readonly)

The peer name



323
324
325
# File 'lib/roby/distributed/peer.rb', line 323

def name
  @name
end

#peer_infoObject (readonly)

The [host, port] pair at the peer end



269
270
271
# File 'lib/roby/distributed/peer.rb', line 269

def peer_info
  @peer_info
end

#proxiesObject (readonly)

The set of proxies for object from this remote peer



241
242
243
# File 'lib/roby/distributed/peer.rb', line 241

def proxies
  @proxies
end

#remote_idObject (readonly)

The object which identifies this peer on the network



265
266
267
# File 'lib/roby/distributed/peer.rb', line 265

def remote_id
  @remote_id
end

#remote_nameObject (readonly)

The name of the remote peer



267
268
269
# File 'lib/roby/distributed/peer.rb', line 267

def remote_name
  @remote_name
end

#remote_planObject

The RemoteID for the peer main plan



271
272
273
# File 'lib/roby/distributed/subscription.rb', line 271

def remote_plan
  @remote_plan
end

#removing_proxiesObject (readonly)

The set of proxies we are currently removing. See BasicObject#forget_peer



243
244
245
# File 'lib/roby/distributed/peer.rb', line 243

def removing_proxies
  @removing_proxies
end

#send_queueObject (readonly)

The queue which holds all calls to the remote peer. Calls are saved as CallSpec objects



584
585
586
# File 'lib/roby/distributed/communication.rb', line 584

def send_queue
  @send_queue
end

#send_threadObject (readonly)

The transmission thread



581
582
583
# File 'lib/roby/distributed/communication.rb', line 581

def send_thread
  @send_thread
end

#socketObject (readonly)

The connection socket with our peer



245
246
247
# File 'lib/roby/distributed/peer.rb', line 245

def socket
  @socket
end

#stateObject

The remote state



277
278
279
# File 'lib/roby/distributed/peer.rb', line 277

def state
  @state
end

#statsObject (readonly)

A ComStats object which holds the communication statistics for this peer stats.tx is the count of bytes sent to the peer while stats.rx is the count of bytes received



251
252
253
# File 'lib/roby/distributed/peer.rb', line 251

def stats
  @stats
end

#taskObject (readonly)

The ConnectionTask object for this peer



325
326
327
# File 'lib/roby/distributed/peer.rb', line 325

def task
  @task
end

#triggersObject (readonly)

The ID => block hash of all triggers we have defined on the remote plan



275
276
277
# File 'lib/roby/distributed/peer.rb', line 275

def triggers
  @triggers
end

Class Method Details

.abort_connection_thread(connection_space, remote_id, lock = true) ⇒ Object



99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/roby/distributed/communication.rb', line 99

def self.abort_connection_thread(connection_space, remote_id, lock = true)
		if lock
 connection_space.synchronize do
			abort_connection_thread(connection_space, remote_id, false)
 end
		end

		connection_space.pending_connections.delete(remote_id)
		if peer = connection_space.peers[remote_id]
 begin
			connection_space.mutex.unlock
			peer.disconnected(:aborted)
 ensure
			connection_space.mutex.lock
 end
		end
end

.connect(neighbour) ⇒ Object

Connect to neighbour and return the corresponding peer. It is a blocking method, so it is an error to call it from within the control thread



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/roby/distributed/communication.rb', line 49

def self.connect(neighbour)
		Roby.condition_variable(true) do |cv, mutex|
 peer = nil
 mutex.synchronize do
			thread = initiate_connection(Distributed.state, neighbour) do |peer|
  return peer unless thread
			end

			begin
  mutex.unlock
  thread.value
			rescue Exception => e
  connection_space.synchronize do
				connection_space.pending_connections.delete(neighbour.remote_id)
  end
  raise ConnectionFailed.new(neighbour), e.message
			ensure
  mutex.lock
			end
 end
		end
end

.connection_request(connection_space, socket) ⇒ Object

Create a Peer object for a connection attempt on the server socket There is nothing to do here. The remote peer is supposed to send us a #connect message, after which we can assume that the connection is up



208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
# File 'lib/roby/distributed/communication.rb', line 208

def self.connection_request(connection_space, socket)
		socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)

		# read connection info from +socket+
		info_size = *socket.read(4).unpack("N")
		m, remote_token, remote_name, remote_id, remote_state = 
 Marshal.load(socket.read(info_size))

		Distributed.debug "connection attempt from #{socket}: #{m} #{remote_name} #{remote_id}"

		connection_space.synchronize do
 # Now check the connection status
 if old_peer = connection_space.aborted_connections.delete(remote_id)
			reply = [:aborted]
 elsif m == :connect && peer = connection_space.peers[remote_id]
			reply = [:already_connected]
 else
			token, connecting_thread = connection_space.pending_connections[remote_id]
			if token && token < remote_token
  if connecting_thread
				begin
   connection_space.mutex.unlock
   connecting_thread.join
				ensure
   connection_space.mutex.lock
				end
  end
  reply = [:already_connecting]
			elsif m == :reconnect
  peer = connection_space.peers[remote_id]
  peer.reconnected(socket)
  reply = [:reconnected]
			else
  peer = new(connection_space, socket, remote_name, remote_id, remote_state)
  reply = [:connected, connection_space.name,
				connection_space.remote_id, 
				Distributed.format(Roby::State)]
			end
 end

 Distributed.debug "connection attempt from #{socket}: #{reply[0]}"
 reply = Marshal.dump(reply)
 socket.write [reply.size].pack("N")
 socket.write reply
		end
end

.initiate_connection(connection_space, neighbour, &block) ⇒ Object

Start connecting to neighbour in an another thread and yield the corresponding Peer object. This is safe to call if we have already connected to neighbour, in which case the already existing peer is returned.

The Peer object is yield from within the control thread, only when the :ready event of the peer’s ConnectionTask has been emitted

Returns the connection thread



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/roby/distributed/communication.rb', line 82

def self.initiate_connection(connection_space, neighbour, &block)
		connection_space.synchronize do
 if peer = connection_space.peers[neighbour.remote_id]
			# already connected
			yield(peer) if block_given?
			return
 end

 local_token = ConnectionToken.new
 call = [:connect, local_token,
			connection_space.name,
			connection_space.remote_id, 
			Distributed.format(Roby::State)]
 send_connection_request(connection_space, neighbour, call, local_token, &block)
		end
end

.send_connection_request(connection_space, neighbour, call, local_token, &block) ⇒ Object

Generic handling of connection/reconnection initiated by this side



175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
# File 'lib/roby/distributed/communication.rb', line 175

def self.send_connection_request(connection_space, neighbour, call, local_token, &block) # :nodoc:
		remote_id = neighbour.remote_id
		token, connecting_thread = connection_space.pending_connections[remote_id]
		if token
 # we are already connecting to the peer, check the connection token
 peer = begin
     connection_space.mutex.unlock
     connecting_thread.value
 ensure
     connection_space.mutex.lock
 end

 if token < local_token
			if !peer
  raise "something went wrong during connection: got nil peer with better token"
			end
			yield(peer) if block_given?
			return
 end
		end


		connecting_thread = Thread.new do
 send_connection_thread(connection_space, neighbour, call, local_token, &block)
		end
		connection_space.pending_connections[remote_id] = [local_token, connecting_thread]
		connecting_thread
end

.send_connection_thread(connection_space, neighbour, call, local_token, &block) ⇒ Object



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# File 'lib/roby/distributed/communication.rb', line 117

def self.send_connection_thread(connection_space, neighbour, call, local_token, &block)
		remote_id = neighbour.remote_id
		Thread.current.abort_on_exception = false

		begin
 socket = TCPSocket.new(remote_id.uri, remote_id.ref)
		rescue Errno::ECONNRESET, Errno::ECONNREFUSED
 abort_connection_thread(connection_space, remote_id)
 return
		end

		begin
 socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
 Distributed.debug "#{call[0]}: #{neighbour} on #{socket.peer_info}"

 # Send the connection request
 call = Marshal.dump(call)
 socket.write [call.size].pack("N")
 socket.write call

 reply_size = socket.read(4)
 if !reply_size
			raise "peer disconnected"
 end
 reply = Marshal.load(socket.read(*reply_size.unpack("N")))
		rescue Errno::ECONNRESET, Errno::ENOTCONN
 abort_connection_thread(connection_space, remote_id)
 return
		end

		connection_space.synchronize do
 connection_space.pending_connections.delete(remote_id)
 m = reply.shift
 Roby::Distributed.debug "remote peer #{m}"

 # if the remote peer is also connecting, and if its
 # token is better than our own, m will be nil and thus
 # the thread will finish without doing anything

 case m
 when :connected
			peer = new(connection_space, socket, *reply)
 when :reconnected
			peer = connection_space.peers[remote_id]
			peer.reconnected(socket)
 when :aborted
			abort_connection_thread(connection_space, remote_id, false)
			return
 when :already_connecting, :already_connected
			peer = connection_space.peers[remote_id]
 end

 yield(peer) if peer && block_given?
 peer
		end
end

Instance Method Details

#call(m, *args, &block) ⇒ Object

Note that it is forbidden to use this method in control or communication threads, as it would make the application deadlock



750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
# File 'lib/roby/distributed/communication.rb', line 750

def call(m, *args, &block)
		if !Roby.outside_control? || Roby::Control.taken_mutex?
 raise "cannot use Peer#call in control thread or while taking the Roby::Control mutex"
		end

		result = nil
		Roby.condition_variable(true) do |cv, mt|
 mt.synchronize do
			Distributed.debug do
  "calling #{remote_name}.#{m}"
			end

			callback = Proc.new do |return_value|
  mt.synchronize do
				result = return_value
				block.call(return_value) if block
				cv.broadcast
  end
			end

			queue_call false, m, args, callback, Thread.current
			cv.wait(mt)
 end
		end

		result
end

#call_attached_block(call, result) ⇒ Object

Calls the completion block that has been given to #transmit when call is completed (the on_completion parameter of #queue_call). A remote call is completed when it has been processed remotely and the callbacks returned by the remote server (if any) have been processed as well. result is the value returned by the remote server.



857
858
859
860
861
862
863
864
865
866
# File 'lib/roby/distributed/communication.rb', line 857

def call_attached_block(call, result)
		if block = call.on_completion
 begin
			Roby::Distributed.debug "calling completion block #{block} for #{call}"
			block.call(result)
 rescue Exception => e
			Roby.application_error(:droby_callbacks, block, e)
 end
		end
end

#check_marshallable(object, stack = ValueSet.new) ⇒ Object

Checks that object is marshallable. If object is a collection, it will check that each of its elements is marshallable first. This is automatically called for all messages if DEBUG_MARSHALLING is set to true.



598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
# File 'lib/roby/distributed/communication.rb', line 598

def check_marshallable(object, stack = ValueSet.new)
		if !object.kind_of?(DRbObject) && object.respond_to?(:each) && !object.kind_of?(String)
 if stack.include?(object)
			Roby.warn "recursive marshalling of #{obj}"
			raise "recursive marshalling"
 end

 stack << object
 begin
			object.each do |obj|
  marshalled = begin
     check_marshallable(obj, stack)
 rescue Exception
     raise TypeError, "cannot dump #{obj}(#{obj.class}): #{$!.message}"
 end

				
  if Marshal.load(marshalled).kind_of?(DRb::DRbUnknown)
				raise TypeError, "cannot load #{obj}(#{obj.class})"
  end
			end
 ensure
			stack.delete(object)
 end
		end
		Marshal.dump(object)
end

#communication_loopObject

Main loop of the thread which communicates with the remote peer



779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
# File 'lib/roby/distributed/communication.rb', line 779

def communication_loop
		Thread.current.priority = 2
		id = 0
		data   = nil
		buffer = StringIO.new(" " * 8, 'w')

		loop do
 data ||= send_queue.shift
 return if disconnected?

 # Wait for the link to be alive before sending anything
 while !link_alive?
			return if disconnected?
			connection_space.wait_next_discovery
 end
 return if disconnected?

 buffer.truncate(8)
 buffer.seek(8)
 Marshal.dump(data, buffer)
 buffer.string[0, 8] = [id += 1, buffer.size - 8].pack("NN")

 begin
			size = buffer.string.size
			Roby::Distributed.debug { "sending #{size}B to #{self}" }
			stats.tx += size
			socket.write(buffer.string)

			data = nil
 rescue Errno::EPIPE
			@dead = true
			# communication error, retry sending the data (or, if we are disconnected, return)
 end
		end

rescue Interrupt
rescue Exception
		Distributed.fatal do
 "While sending #{data.inspect}\n" +
 "Communication thread dies with\n#{$!.full_message}"
		end

		disconnected!

ensure
		Distributed.info "communication thread quitting for #{self}. Rx: #{stats.rx}B, Tx: #{stats.tx}B"
		calls = []
		while !completion_queue.empty?
 calls << completion_queue.shift
		end

		calls.each do |call_spec|
 next unless call_spec
 if thread = call_spec.waiting_thread
			thread.raise DisconnectedError
 end
		end

		Distributed.info "communication thread quit for #{self}"
end

#connected?Boolean

Returns true if the connection has been established. See also #link_alive?

Returns:

  • (Boolean)


356
# File 'lib/roby/distributed/communication.rb', line 356

def connected?; connection_state == :connected end

#create_sibling(object) ⇒ Object

Creates a sibling for object on the peer, and returns the corresponding DRbObject



190
191
192
193
194
195
196
197
198
199
200
201
202
# File 'lib/roby/distributed/distributed_object.rb', line 190

def create_sibling(object)
		unless object.kind_of?(DistributedObject)
 raise TypeError, "cannot create a sibling for a non-distributed object"
		end

		call(:create_sibling, object)
		subscriptions << object.sibling_on(self)
		Roby::Control.synchronize do
 local_server.subscribe(object)
		end

		synchro_point
end

#disable_rxObject

Disables the receiving part of the communication link. It is an accumulator: if #disable_rx is called twice, then RX will be reenabled only when #enable_rx is also called twice.



380
# File 'lib/roby/distributed/communication.rb', line 380

def disable_rx; @disabled_rx += 1 end

#disable_txObject

Disables the sending part of the communication link. It is an accumulator: if #disable_tx is called twice, then TX will be reenabled only when #enable_tx is also called twice.



370
# File 'lib/roby/distributed/communication.rb', line 370

def disable_tx; @disabled_tx += 1 end

#disabled_rx?Boolean

True if RX is currently disabled

Returns:

  • (Boolean)


386
# File 'lib/roby/distributed/communication.rb', line 386

def disabled_rx?; @disabled_rx > 0 end

#disabled_tx?Boolean

True if TX is currently disabled

Returns:

  • (Boolean)


376
# File 'lib/roby/distributed/communication.rb', line 376

def disabled_tx?; @disabled_tx > 0 end

#disconnectObject

Normal disconnection procedure.

The procedure is as follows:

  • we set the connection state as ‘disconnecting’. This disables all notifications for this peer (see for instance Distributed.each_subscribed_peer)

  • we queue the :disconnected message

At this point, we are waiting for the remote peer to do the same: send us ‘disconnected’. When we receive that message, we put the connection into the disconnected state and all transmission is forbidden. We make the transmission thread quit then, and the ‘failed’ event is emitted on the ConnectionTask task

Note that once the connection leaves the connected state, the only messages allowed by #queue_call are ‘completed’ and ‘disconnected’



288
289
290
291
292
293
294
# File 'lib/roby/distributed/communication.rb', line 288

def disconnect
		synchronize do
 Roby::Distributed.info "disconnecting from #{self}"
 @connection_state = :disconnecting
		end
		queue_call false, :disconnect
end

#disconnected(event = :failed) ⇒ Object

Called when the peer acknowledged the fact that we disconnected



311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
# File 'lib/roby/distributed/communication.rb', line 311

def disconnected(event = :failed) # :nodoc:
		Roby::Distributed.info "#{remote_name} disconnected (#{event})"

		connection_space.synchronize do
 Distributed.peers.delete(remote_id)
		end

		synchronize do
 @connection_state = :disconnected

 if @send_thread && @send_thread != Thread.current
			begin
  @send_queue.clear
  @send_queue.push nil
  mutex.unlock
  @send_thread.join
			ensure
  mutex.lock
			end
 end
 @send_thread = nil

 proxies.each_value do |obj|
			obj.remote_siblings.delete(self)
 end
 proxies.clear
 removing_proxies.clear

 socket.close unless socket.closed?
		end

		Roby.once do
 task.emit(event)
		end
end

#disconnected!Object

Call to disconnect outside of the normal protocol.



348
349
350
351
352
353
# File 'lib/roby/distributed/communication.rb', line 348

def disconnected!
		connection_space.synchronize do
 connection_space.aborted_connections[remote_id] = self
		end
		disconnected(:aborted)
end

#disconnected?Boolean

Returns true if the connection with this peer has been removed

Returns:

  • (Boolean)


361
# File 'lib/roby/distributed/communication.rb', line 361

def disconnected?; connection_state == :disconnected end

#disconnecting?Boolean

Returns true if the we disconnected on our side but the peer did not acknowledge it yet

Returns:

  • (Boolean)


359
# File 'lib/roby/distributed/communication.rb', line 359

def disconnecting?; connection_state == :disconnecting end

#discover_neighborhood(object, distance) {|local_object(remote_object(object))| ... } ⇒ Object

Discovers all objects at a distance dist from obj. The object can be either a remote proxy or the remote object itself

Yields:



517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
# File 'lib/roby/distributed/peer.rb', line 517

def discover_neighborhood(object, distance)
    objects = ValueSet.new
    Roby.condition_variable(true) do |synchro, mutex|
	mutex.synchronize do
	    transmit(:discover_neighborhood, object, distance) do |edges|
		edges = local_object(edges)
		edges.each do |rel, from, to, info|
		    objects << from.root_object << to.root_object
		end
		Roby::Distributed.update_all(objects) do
		    edges.each do |rel, from, to, info|
			from.add_child_object(to, rel, info)
		    end
		end

		objects.each { |obj| Roby::Distributed.keep.ref(obj) }
		
		synchro.broadcast
	    end
	    synchro.wait(mutex)
	end
    end

    yield(local_object(remote_object(object)))

    Roby::Control.synchronize do
	objects.each { |obj| Roby::Distributed.keep.deref(obj) }
    end
end

#droby_dump(dest = nil) ⇒ Object

Returns an intermediate representation of self suitable to be sent to the dest peer.



286
287
288
# File 'lib/roby/distributed/protocol.rb', line 286

def droby_dump(dest = nil)
		@__droby_marshalled__ ||= DRoby.new(remote_name, remote_id)
end

#enable_rxObject

Enables the receiving part of the communication link. It is an accumulator: if #enable_rx is called twice, then RX will be disabled only when #disable_rx is also called twice.



384
# File 'lib/roby/distributed/communication.rb', line 384

def enable_rx; @disabled_rx -= 1 end

#enable_txObject

Enables the sending part of the communication link. It is an accumulator: if #enable_tx is called twice, then TX will be disabled only when #disable_tx is also called twice.



374
# File 'lib/roby/distributed/communication.rb', line 374

def enable_tx; @disabled_tx -= 1 end

#fatal_error(error, msg, args) ⇒ Object

error has been raised while we were processing msg(*args) This error cannot be recovered, and the connection to the peer will be closed.

This sends the PeerServer#fatal_error message to our peer



301
302
303
304
305
306
307
308
# File 'lib/roby/distributed/communication.rb', line 301

def fatal_error(error, msg, args)
		synchronize do
 Roby::Distributed.fatal "fatal error '#{error.message}' while processing #{msg}(#{args.join(", ")})"
 Roby::Distributed.fatal Roby.filter_backtrace(error.backtrace).join("\n  ")
 @connection_state = :disconnecting
		end
		queue_call false, :fatal_error, [error, msg, args]
end

#find_tasksObject

For thread-safe operation, always use #each on the resulting query: during the enumeration, the local plan GC will not remove those tasks.



332
333
334
# File 'lib/roby/distributed/peer.rb', line 332

def find_tasks
    Roby::Query.new(self)
end

#incremental_dump?(object) ⇒ Boolean

This method is used by Distributed.format to determine the dumping policy for object. If the method returns true, then only the RemoteID object of object will be sent to the peer. Otherwise, an intermediate object describing object is sent.

Returns:

  • (Boolean)


260
261
262
# File 'lib/roby/distributed/peer.rb', line 260

def incremental_dump?(object)
    object.respond_to?(:remote_siblings) && object.remote_siblings[self] 
end

Checks if the connection is currently alive, i.e. if we can send data on the link. This does not mean that we currently have no interaction with the peer: it only means that we cannot currently communicate with it.

Returns:

  • (Boolean)


392
393
394
395
396
# File 'lib/roby/distributed/communication.rb', line 392

def link_alive?
		return false if socket.closed? || @dead || @disabled_tx > 0
		return false unless !remote_id || connection_space.neighbours.find { |n| n.remote_id == remote_id }
		true
end

Mark the link as dead regardless of the last neighbour discovery. This will be reset during the next neighbour discovery



365
# File 'lib/roby/distributed/communication.rb', line 365

def link_dead!; @dead = true end

#local_nameObject

The name of the local ConnectionSpace object we are acting on



272
# File 'lib/roby/distributed/peer.rb', line 272

def local_name; connection_space.name end

#local_object(marshalled, create = true) ⇒ Object Also known as: proxy

Returns the local object for object. object can be either a marshalled object or a local proxy. Raises ArgumentError if it is none of the two. In the latter case, a RemotePeerMismatch exception is raised if the local proxy is not known to this peer.



472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
# File 'lib/roby/distributed/peer.rb', line 472

def local_object(marshalled, create = true)
    if marshalled.kind_of?(RemoteID)
	return marshalled.to_local(self, create)
    elsif !marshalled.respond_to?(:proxy)
	return marshalled
    elsif marshalled.respond_to?(:remote_siblings)
	# 1/ try any local RemoteID reference registered in the marshalled object
	local_id  = marshalled.remote_siblings[Roby::Distributed.droby_dump]
	if local_id
	    local_object = local_id.local_object rescue nil
	    local_object = nil if local_object.finalized?
	end

	# 2/ try the #proxies hash
	if !local_object 
	    remote_id = marshalled.remote_siblings[droby_dump]
	    unless local_object = proxies[remote_id]
		return if !create

		# remove any local ID since we are re-creating it
		marshalled.remote_siblings.delete(Roby::Distributed.droby_dump)
		local_object = marshalled.proxy(self)
	    end
	end

	if !local_object
	    raise "no remote siblings for #{remote_name} in #{marshalled} (#{marshalled.remote_siblings})"
	end

	if marshalled.respond_to?(:update)
	    Roby::Distributed.update(local_object) do
		marshalled.update(self, local_object) 
	    end
	end
	proxy_setup(local_object)
    else
	local_object = marshalled.proxy(self)
    end

    local_object
end

#objects(object, create_local = true) ⇒ Object

Returns the remote_object, local_object pair for object. object can be either a marshalled object or a local proxy. Raises ArgumentError if it is none of the two. In the latter case, a RemotePeerMismatch exception is raised if the local proxy is not known to this peer.



436
437
438
439
440
441
442
443
444
445
446
447
448
# File 'lib/roby/distributed/peer.rb', line 436

def objects(object, create_local = true)
    if object.kind_of?(RemoteID)
	if local_proxy = proxies[object]
	    proxy_setup(local_proxy)
	    return [object, local_proxy]
	end
	raise ArgumentError, "got a RemoteID which has no proxy"
    elsif object.respond_to?(:proxy)
	[object.remote_object, proxy(object, create_local)]
    else
	[object.sibling_on(self), object]
    end
end

#on(matcher, &block) ⇒ Object

This sends the PeerServer#add_trigger message to the peer.



385
386
387
388
# File 'lib/roby/distributed/peer.rb', line 385

def on(matcher, &block)
    triggers[matcher.object_id] = [matcher, block]
    transmit(:add_trigger, matcher.object_id, matcher)
end

#owns?(object) ⇒ Boolean

Returns true if this peer owns object

Returns:

  • (Boolean)


418
# File 'lib/roby/distributed/peer.rb', line 418

def owns?(object); object.owners.include?(self) end

#proxy_setup(local_object) ⇒ Object



450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
# File 'lib/roby/distributed/peer.rb', line 450

def proxy_setup(local_object)
    if local_object.respond_to?(:execution_agent) && 
	local_object.owners.size == 1 && 
	owns?(local_object) &&
	!local_object.execution_agent &&
	local_object.plan

	remote_owner = local_object.owners.first
	connection_task = local_object.plan[self.task]

	Roby::Distributed.update_all([local_object, connection_task]) do
	    local_object.executed_by connection_task
	end
    end

    local_object
end

#push_subscription(object) ⇒ Object

Make our peer subscribe to object



265
266
267
268
# File 'lib/roby/distributed/subscription.rb', line 265

def push_subscription(object)
		local_server.subscribe(object)
		synchro_point
end

#query_each(result_set) ⇒ Object

Yields the tasks saved in result_set by #query_result_set. During the enumeration, the tasks are marked as permanent to avoid plan GC. The block can subscribe to the one that are interesting. After the block has returned, all non-subscribed tasks will be subject to plan GC.



359
360
361
362
363
364
365
366
367
368
369
370
371
372
# File 'lib/roby/distributed/peer.rb', line 359

def query_each(result_set) # :nodoc:
    result_set.each do |task|
	yield(task)
    end

ensure
    Roby::Control.synchronize do
	if result_set
	    result_set.each do |task|
		Roby::Distributed.keep.deref(task)
	    end
	end
    end
end

#query_result_set(query) ⇒ Object

Returns a set of remote tasks for query applied on the remote plan This is not to be accessed directly. It is part of the Query interface.

See #find_tasks.



341
342
343
344
345
346
347
348
349
350
351
352
# File 'lib/roby/distributed/peer.rb', line 341

def query_result_set(query)
    result = ValueSet.new
    call(:query_result_set, query) do |marshalled_set|
	for task in marshalled_set
	    task = local_object(task)
	    Roby::Distributed.keep.ref(task)
	    result << task
	end
    end

    result
end

#queue_call(is_callback, m, args = [], on_completion = nil, waiting_thread = nil) ⇒ Object

Add a CallSpec object in #send_queue. Do not use that method directly, but use #transmit and #call instead.

The message to be sent is m(*args). on_completion is either nil or a block object which should be called once the message has been processed by our remote peer. waiting_thread is a Thread object of a thread waiting for the message to be processed. #raise will be called on it if an error has occured during the remote processing.

If is_callback is true, it means that the message is being queued during the processing of another message. In that case, we will receive the completion message only when all callbacks have also been processed. Queueing callbacks while processing another callback is forbidden and the communication layer raises RecursiveCallbacksError if it happens.

#queueing allow to queue normal messages when they would have been marked as callbacks.



652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
# File 'lib/roby/distributed/communication.rb', line 652

def queue_call(is_callback, m, args = [], on_completion = nil, waiting_thread = nil)
		# Do some sanity checks
		if !m.respond_to?(:to_sym)
 raise ArgumentError, "method argument should be a symbol, was #{m.class}"
		end

		# Check the connection state
		if (disconnecting? && m != :disconnect && m != :fatal_error) || disconnected?
 raise DisconnectedError, "cannot queue #{m}(#{args.join(", ")}), we are not currently connected to #{remote_name}"
		end

		# Marshal DRoby-dumped objects now, since the object may be
		# modified between now and the time it is sent
		formatted_args = Distributed.format(args, self)

		if Roby::Distributed::DEBUG_MARSHALLING
 check_marshallable(formatted_args)
		end
		
		call_spec = CallSpec.new(is_callback, 
 m, formatted_args, args, 
 on_completion, caller(2), waiting_thread)

		synchronize do
 # No return message for 'completed' (of course)
 if call_spec.method != :completed
			@@message_id += 1
			call_spec.message_id = @@message_id
			completion_queue << call_spec

 elsif !current_cycle.empty? && !(args[0] || args[1])
			# Try to merge empty completed messages
			last_call = current_cycle.last
			last_method, last_args = last_call[1], last_call[2]

			case last_method
			when :completed
  if !(last_args[0] || last_args[1])
				Distributed.debug "merging two completion messages"
				current_cycle.pop
				call_spec.method = :completion_group
				call_spec.formatted_args = [last_args[2], args[2]]
  end
			when :completion_group
  Distributed.debug "extending a completion group"
  current_cycle.pop
  call_spec.method = :completion_group
  call_spec.formatted_args = [last_args[0], args[2]]
			end
 end

 Distributed.debug { "#{call_spec.is_callback ? 'adding callback' : 'queueing'} [#{call_spec.message_id}]#{remote_name}.#{call_spec.method}" }
 current_cycle    << [call_spec.is_callback, call_spec.method, call_spec.formatted_args, !waiting_thread, call_spec.message_id]
 if sync? || CYCLE_END_CALLS.include?(m)
			send_queue << current_cycle
			@current_cycle = Array.new
 end
		end
end

#queueingObject

If #transmit calls are done in the block given to #queueing, they will queue the call normally, instead of marking it as callback



714
715
716
717
718
719
720
721
722
# File 'lib/roby/distributed/communication.rb', line 714

def queueing
		old_processing = local_server.processing?

		local_server.processing = false
		yield

ensure
		local_server.processing = old_processing
end

#reconnectObject

Reconnect to the given peer after the socket closed



256
257
258
259
260
261
262
263
# File 'lib/roby/distributed/communication.rb', line 256

def reconnect
		local_token = ConnectionToken.new

		connection_space.synchronize do
 call = [:reconnect, local_token, connection_space.name, connection_space.remote_id]
 Peer.send_connection_request(connection_space, self, call, local_token)
		end
end

#reconnected(socket) ⇒ Object

Called when we managed to reconnect to our peer. socket is the new communication socket



266
267
268
269
270
# File 'lib/roby/distributed/communication.rb', line 266

def reconnected(socket)
		Roby::Distributed.debug "new socket for #{self}: #{socket.peer_info}"
		connection_space.pending_sockets << [socket, self]
		@socket = socket
end

#remote_object(object) ⇒ Object

Returns the remote object for object. object can be either a DRbObject, a marshalled object or a local proxy. In the latter case, a RemotePeerMismatch exception is raised if the local proxy is not known to this peer.



424
425
426
427
428
429
# File 'lib/roby/distributed/peer.rb', line 424

def remote_object(object)
    if object.kind_of?(RemoteID)
	object
    else object.sibling_on(self)
    end
end

#remove_trigger(id) ⇒ Object

Remove a trigger referenced by its ID. id is the value returned by Peer#on

This sends the PeerServer#remove_trigger message to the peer.



394
395
396
397
# File 'lib/roby/distributed/peer.rb', line 394

def remove_trigger(id)
    transmit(:remove_trigger, id)
    triggers.delete(id)
end

#report_remote_error(call, error) ⇒ Object

Formats an error message because error has been reported by call



841
842
843
844
845
846
847
848
849
# File 'lib/roby/distributed/communication.rb', line 841

def report_remote_error(call, error)
		error_message = error.full_message { |msg| msg !~ /drb\/[\w+]\.rb/ }
		if call
 "#{remote_name} reports an error on #{call}:\n#{error_message}\n" +
 "call was initiated by\n  #{call.trace.join("\n  ")}"
		else
 "#{remote_name} reports an error on:\n#{error_message}"
		end
end

#subscribe(object) ⇒ Object

Explicitely subscribe to #object

See also #subscriptions, #subscribed? and #unsubscribe



245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
# File 'lib/roby/distributed/subscription.rb', line 245

def subscribe(object)
		while object.respond_to?(:__getobj__)
 object = object.__getobj__
		end

		if remote_object = (remote_object(object) rescue nil)
 if !subscriptions.include?(remote_object)
			remote_object = nil
 end
		end

		unless remote_object
 remote_sibling = object.sibling_on(self)
 remote_object = call(:subscribe, remote_sibling)
 synchro_point
		end
		local_object = local_object(remote_object)
end

#subscribe_planObject

Subscribe to the remote plan



274
275
276
277
# File 'lib/roby/distributed/subscription.rb', line 274

def subscribe_plan
		call(:subscribe_plan, connection_space.plan.remote_id)
		synchro_point
end

#subscribed?(object) ⇒ Boolean

True if we are explicitely subscribed to object. Automatically subscribed objects will not be included here, but BasicObject#updated? will return true for them

See also #subscriptions, #subscribe and #unsubscribe

Returns:

  • (Boolean)


295
296
297
298
299
# File 'lib/roby/distributed/subscription.rb', line 295

def subscribed?(object)
		subscriptions.include?(remote_object(object))
rescue RemotePeerMismatch
		false
end

#subscribed_plan?Boolean

Returns:

  • (Boolean)


288
# File 'lib/roby/distributed/subscription.rb', line 288

def subscribed_plan?; remote_plan && subscriptions.include?(remote_plan) end

#synchro_pointObject



868
# File 'lib/roby/distributed/communication.rb', line 868

def synchro_point; call(:synchro_point) end

#synchronizeObject



578
# File 'lib/roby/distributed/communication.rb', line 578

def synchronize; @mutex.synchronize { yield } end

#to_sObject

:nodoc:



253
254
255
# File 'lib/roby/distributed/peer.rb', line 253

def to_s # :nodoc:
    "Peer:#{remote_name}" 
end

#transaction_give_token(trsc, needs_edition) ⇒ Object

Give the edition token on trsc to the given peer. needs_edition is a flag which, if true, requests that the token is given back at least once to the local plan manager.

Do not use this directly, it is part of the multi-robot communication protocol. Use the edition-related methods on Distributed::Transaction instead.



493
494
495
# File 'lib/roby/distributed/transaction.rb', line 493

def transaction_give_token(trsc, needs_edition)
		call(:transaction_give_token, trsc, needs_edition)
end

#transaction_propose(trsc) ⇒ Object

Send the information related to the given transaction in the remote plan manager.



480
481
482
483
484
# File 'lib/roby/distributed/transaction.rb', line 480

def transaction_propose(trsc)
		synchro_point
		create_sibling(trsc)
		nil
end

#transmit(m, *args, &block) ⇒ Object

Asynchronous call to the remote host. If a block is given, it is called in the communication thread when the call succeeds, with the returned value as argument.



730
731
732
733
734
735
736
737
# File 'lib/roby/distributed/communication.rb', line 730

def transmit(m, *args, &block)
		is_callback = Roby.inside_control? && local_server.processing?
		if is_callback && local_server.processing_callback?
 raise RecursiveCallbacksError, "cannot queue callback #{m}(#{args.join(", ")}) while serving one"
		end
		
		queue_call is_callback, m, args, block
end

#triggered(id, task) ⇒ Object

Calls the block given to Peer#on in a separate thread when task has matched the trigger



401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
# File 'lib/roby/distributed/peer.rb', line 401

def triggered(id, task) # :nodoc:
    task = local_object(task)
    Roby::Distributed.keep.ref(task)
    Thread.new do
	begin
	    if trigger = triggers[id]
		trigger.last.call(task)
	    end
	rescue Exception
	    Roby.warn "trigger handler #{trigger.last} failed with #{$!.full_message}"
	ensure
	    Roby::Distributed.keep.deref(task)
	end
    end
end

#unsubscribe(object) ⇒ Object

Remove an explicit subscription. See also #subscriptions, #subscribe and #subscribed?

See also #subscriptions, #subscribe and #subscribed?



305
306
307
# File 'lib/roby/distributed/subscription.rb', line 305

def unsubscribe(object)
		subscriptions.delete(remote_object(object))
end

#unsubscribe_planObject

Unsubscribe from the remote plan



280
281
282
283
284
285
286
# File 'lib/roby/distributed/subscription.rb', line 280

def unsubscribe_plan
		proxies.delete(remote_plan)
		subscriptions.delete(remote_plan)
		if connected?
 call(:removed_sibling, @remote_plan, connection_space.plan.remote_id)
		end
end