Class: Roby::Distributed::ConnectionSpace

Inherits:
Object
  • Object
show all
Includes:
DRbUndumped
Defined in:
lib/roby/distributed/connection_space.rb,
lib/roby/distributed/peer.rb

Overview

This class manages the connections between this plan manager and the remote plan managers

  • there is only one reception thread, at which all peers send data

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) {|_self| ... } ⇒ ConnectionSpace

Create a new ConnectionSpace objects. The following options can be provided:

name

the name of this plan manager. Defaults to <hostname>-<PID>

period

the discovery period [default: nil]

ring_discovery

whether or not ring discovery should be attempted [default: true]

ring_broadcast

the broadcast address for ring discovery

discovery_tuplespace

the DRbObject referencing the remote tuplespace which holds references to plan managers [default: nil]

plan

the plan this ConnectionSpace acts on. [default: Roby.plan]

listen_at

the port at which we should listen for incoming connections [default: 0]

Yields:

  • (_self)

Yield Parameters:



158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
# File 'lib/roby/distributed/connection_space.rb', line 158

def initialize(options = {})
		super()

		options = validate_options options, 
 :name => "#{Socket.gethostname}-#{Process.pid}", # the name of this host
 :period => nil,				     # the discovery period
 :ring_discovery => true,			     # wether we should do discovery based on Rinda::RingFinger
 :ring_broadcast => '',			     # the broadcast address for discovery
 :discovery_tuplespace => nil,		     # a central tuplespace which lists hosts (including ourselves)
 :plan => nil, 				     # the plan we publish, uses Roby.plan if nil
 :listen_at => 0				     # the port at which we listen for incoming connections

		if options[:ring_discovery] && !options[:period]
 raise ArgumentError, "you must provide a discovery period when using ring discovery"
		end

		@name                 = options[:name]
		@neighbours           = Array.new
		@peers                = Hash.new
		@plan                 = options[:plan] || Roby.plan
		@discovery_period     = options[:period]
		@ring_discovery       = options[:ring_discovery]
		@ring_broadcast       = options[:ring_broadcast]
		@discovery_tuplespace = options[:discovery_tuplespace]
		@port		      = options[:port]
		@pending_sockets = Queue.new
		@pending_connections = Hash.new
		@aborted_connections = Hash.new
		@pending_reconnections = Array.new
		@quit_neighbour_thread = false

		@mutex		      = Mutex.new
		@start_discovery      = ConditionVariable.new
		@finished_discovery   = ConditionVariable.new
		@new_neighbours	      = Queue.new

		@connection_listeners = Array.new

		yield(self) if block_given?

		listen(options[:listen_at])
		@remote_id = RemoteID.new(Socket.gethostname, server_socket.port)

		if central_discovery?
 if (discovery_tuplespace.write([:droby, name, remote_id]) rescue nil)
			if discovery_tuplespace.kind_of?(DRbObject)
  Distributed.info "published #{name}(#{remote_id}) on #{discovery_tuplespace.__drburi}"
			else
  Distributed.info "published #{name}(#{remote_id}) on local tuplespace"
			end
 else
			Distributed.warn "cannot connect to #{discovery_tuplespace.__drburi}, disabling centralized discovery"
			discovery_tuplespace = nil
 end
		end

		if ring_discovery?
 Distributed.info "doing ring discovery on #{ring_broadcast}"
		end

		synchronize do
 # Start the discovery thread and wait for it to be initialized
 @discovery_thread = Thread.new(&method(:neighbour_discovery))
 finished_discovery.wait(mutex)
		end
		start_neighbour_discovery(true)

		receive

		Roby::Control.finalizers << method(:quit)
end

Instance Attribute Details

#aborted_connectionsObject (readonly)

A remote_id => thread of the connection threads

See Peer.connection_request, Peer.initiate_connection and Peer#reconnect



114
115
116
# File 'lib/roby/distributed/connection_space.rb', line 114

def aborted_connections
  @aborted_connections
end

#connection_listenersObject (readonly)

An array of procs called at the end of the neighbour discovery, after #neighbours have been updated



338
339
340
# File 'lib/roby/distributed/connection_space.rb', line 338

def connection_listeners
  @connection_listeners
end

#discovery_periodObject (readonly)

The period at which we do discovery



118
119
120
# File 'lib/roby/distributed/connection_space.rb', line 118

def discovery_period
  @discovery_period
end

#discovery_threadObject (readonly)

The discovery thread



120
121
122
# File 'lib/roby/distributed/connection_space.rb', line 120

def discovery_thread
  @discovery_thread
end

#discovery_tuplespaceObject (readonly)

The central tuplespace where neighbours are announced



129
130
131
# File 'lib/roby/distributed/connection_space.rb', line 129

def discovery_tuplespace
  @discovery_tuplespace
end

#finished_discoveryObject (readonly)

A condition variable which is signalled when discovery finishes



135
136
137
# File 'lib/roby/distributed/connection_space.rb', line 135

def finished_discovery
  @finished_discovery
end

#last_discoveryObject (readonly)

Last time a discovery finished



131
132
133
# File 'lib/roby/distributed/connection_space.rb', line 131

def last_discovery
  @last_discovery
end

#mutexObject (readonly)

The main mutex which is used for synchronization with the discovery thread



139
140
141
# File 'lib/roby/distributed/connection_space.rb', line 139

def mutex
  @mutex
end

#nameObject (readonly)

Our name on the network



145
146
147
# File 'lib/roby/distributed/connection_space.rb', line 145

def name
  @name
end

#new_neighboursObject (readonly)

A queue containing all new neighbours



104
105
106
# File 'lib/roby/distributed/connection_space.rb', line 104

def new_neighbours
  @new_neighbours
end

#peersObject (readonly)

A remote_id => Peer map of the connected peers



106
107
108
# File 'lib/roby/distributed/connection_space.rb', line 106

def peers
  @peers
end

#pending_connectionsObject (readonly)

A remote_id => thread of the connection threads

See Peer.connection_request and Peer.initiate_connection



110
111
112
# File 'lib/roby/distributed/connection_space.rb', line 110

def pending_connections
  @pending_connections
end

#pending_reconnectionsObject (readonly)

The set of peers for which we have lost the link



116
117
118
# File 'lib/roby/distributed/connection_space.rb', line 116

def pending_reconnections
  @pending_reconnections
end

#pending_socketsObject (readonly)

The set of new sockets to wait for. If one of these is closed, Distributed.receive will check wether we are supposed to be connected to the peer. If it’s not the case, the socket will be ignored.



257
258
259
# File 'lib/roby/distributed/connection_space.rb', line 257

def pending_sockets
  @pending_sockets
end

#planObject (readonly)

The plan we are publishing, usually Roby.plan



142
143
144
# File 'lib/roby/distributed/connection_space.rb', line 142

def plan
  @plan
end

#remote_idObject (readonly)

The RemoteID object which allows to reference this ConnectionSpace on the network



251
252
253
# File 'lib/roby/distributed/connection_space.rb', line 251

def remote_id
  @remote_id
end

#ring_broadcastObject (readonly)

The list of broadcasting addresses to search for plan databases



125
126
127
# File 'lib/roby/distributed/connection_space.rb', line 125

def ring_broadcast
  @ring_broadcast
end

#server_socketObject (readonly)

The socket on which we listen for incoming connections



147
148
149
# File 'lib/roby/distributed/connection_space.rb', line 147

def server_socket
  @server_socket
end

#start_discoveryObject (readonly)

A condition variable which is signalled to start a new discovery



133
134
135
# File 'lib/roby/distributed/connection_space.rb', line 133

def start_discovery
  @start_discovery
end

Instance Method Details

#add_owner(object, peer) ⇒ Object



18
19
20
# File 'lib/roby/distributed/peer.rb', line 18

def add_owner(object, peer)
    object.add_owner(peer, false)
end

#central_discovery?Boolean

If we are doing discovery based on a central tuplespace

Returns:

  • (Boolean)


127
# File 'lib/roby/distributed/connection_space.rb', line 127

def central_discovery?; !!@discovery_tuplespace end

#discovering?Boolean

Returns:

  • (Boolean)


325
326
327
328
329
330
331
332
# File 'lib/roby/distributed/connection_space.rb', line 325

def discovering?
   	synchronize do 
 if @last_discovery != @discovery_start
			yield if block_given?
			true
 end
		end
end

#discovery_portObject



340
341
342
343
344
345
# File 'lib/roby/distributed/connection_space.rb', line 340

def discovery_port
		if Distributed.server
 Distributed.server.port
		else DISCOVERY_RING_PORT
		end
end

#droby_dump(dest = nil) ⇒ Object

Define #droby_dump for Peer-like behaviour



462
# File 'lib/roby/distributed/connection_space.rb', line 462

def droby_dump(dest = nil); @__droby_marshalled__ ||= Peer::DRoby.new(name, remote_id) end

#listen(port) ⇒ Object

Sets up a separate thread which listens for connection



231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
# File 'lib/roby/distributed/connection_space.rb', line 231

def listen(port)
		@server_socket = TCPServer.new(nil, port)
		server_socket.listen(10)
		Thread.new do
 begin
			while new_connection = server_socket.accept
  begin
				Peer.connection_request(self, new_connection)
  rescue Exception => e
				Roby::Distributed.fatal "failed to handle connection request on #{new_connection}"
				Roby::Distributed.fatal e.full_message
				new_connection.close
  end
			end
 rescue Exception
 end
		end
end

#neighbour_discoveryObject

Loop which does neighbour_discovery



348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
# File 'lib/roby/distributed/connection_space.rb', line 348

def neighbour_discovery
		Thread.current.priority = 2

		discovered = []

		# Initialize so that @discovery_start == discovery_start
		@discovery_start = nil
		discovery_start = nil
		finger	    = nil
		loop do
 return if @quit_neighbour_thread

 Control.synchronize do
			old_neighbours, @neighbours = @neighbours, []
			for new in discovered
  unless new.remote_id == remote_id || @neighbours.include?(new)
				@neighbours << new
				unless old_neighbours.include?(new)
   new_neighbours << [self, new]
				end
  end
			end
			discovered.clear
 end

 connection_listeners.each { |listen| listen.call(self) }
 synchronize do
			@last_discovery = discovery_start
			finished_discovery.broadcast

			if @discovery_start == @last_discovery
  start_discovery.wait(mutex)
			end
			return if @quit_neighbour_thread
			discovery_start = @discovery_start

			if ring_discovery? && (!finger || (finger.port != discovery_port))
  finger = Rinda::RingFinger.new(ring_broadcast, discovery_port)
			end
 end

 from = Time.now
 if central_discovery?
			discovery_tuplespace.read_all([:droby, nil, nil]).
  each do |n| 
				next if n[2] == remote_id
				n = Neighbour.new(n[1], n[2]) 
				discovered << n
  end
 end

 if discovery_period
			remaining = (@discovery_start + discovery_period) - Time.now
 end

 if ring_discovery?
			finger.lookup_ring(remaining) do |cs|
  next if cs == self

  discovered << Neighbour.new(cs.name, cs.remote_id)
			end
 end
		end

rescue Interrupt
rescue Exception => e
		Distributed.fatal "neighbour discovery died with\n#{e.full_message}"
		Distributed.fatal "Peers are: #{Distributed.peers.map { |id, peer| "#{id.inspect} => #{peer}" }.join(", ")}"

ensure
		Distributed.info "quit neighbour thread"
		neighbours.clear
		new_neighbours.clear

		# Force disconnection in case something got wrong in the normal
		# disconnection process
		Distributed.peers.values.each do |peer|
 peer.disconnected unless peer.disconnected?
		end

		synchronize do
 @discovery_thread = nil
 finished_discovery.broadcast
		end
end

#neighboursObject

List of discovered neighbours



102
# File 'lib/roby/distributed/connection_space.rb', line 102

def neighbours; synchronize { @neighbours.dup } end

#owns?(object) ⇒ Boolean

Returns:

  • (Boolean)


334
# File 'lib/roby/distributed/connection_space.rb', line 334

def owns?(object); object.owners.include?(Roby::Distributed) end

#prepare_remove_owner(object, peer) ⇒ Object



24
25
26
27
28
# File 'lib/roby/distributed/peer.rb', line 24

def prepare_remove_owner(object, peer)
    object.prepare_remove_owner(peer)
rescue Exception => e
    e
end

#quitObject

Make the ConnectionSpace quit



465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
# File 'lib/roby/distributed/connection_space.rb', line 465

def quit
		Distributed.debug "ConnectionSpace #{self} quitting"

		# Remove us from the central tuplespace
		if central_discovery?
 begin
			discovery_tuplespace.take [:droby, nil, remote_id], 0
 rescue DRb::DRbConnError, Rinda::RequestExpiredError
 end
		end

		# Make the neighbour discovery thread quit as well
		thread = synchronize do
 if thread = @discovery_thread
			thread.raise Interrupt, "forcing discovery thread quit"
 end
 thread
		end
		if thread 
 thread.join
		end

ensure
		if server_socket
 begin
			server_socket.close 
 rescue IOError
 end
		end

		Roby::Control.finalizers.delete(method(:quit))
		if Distributed.state == self
 Distributed.state = nil
		end
end

#receiveObject

Starts the reception thread



260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
# File 'lib/roby/distributed/connection_space.rb', line 260

def receive # :nodoc:
		sockets = Hash.new
		Thread.new do
 while true
			begin
  while !pending_sockets.empty?
				socket, peer = pending_sockets.shift
				sockets[socket] = peer
                           begin
                               Roby::Distributed.info "listening to #{socket.peer_info} for #{peer}"
                           rescue IOError
                           end
  end

  begin
				sockets.delete_if { |s, p| s.closed? && p.disconnected? }
				read, _, errors = select(sockets.keys, nil, nil, 0.1)
  rescue IOError
  end
  next if !read
  
  closed_sockets = []
  for socket in read
				if socket.closed?
   closed_sockets << socket
   next
				end

                           begin
                               header = socket.read(8)
                               unless header && header.size == 8
                                   closed_sockets << socket
                                   next
                               end

                               id, size = header.unpack("NN")
                               data     = socket.read(size)

                               p = sockets[socket]
                               p.stats.rx += (size + 8)
                               Roby::Distributed.cycles_rx << [p, Marshal.load(data)]
                           rescue Errno::ECONNRESET, IOError
                               closed_sockets << socket
                           end
  end

  for socket in closed_sockets
				p = sockets[socket]
				if p.connected?
   Roby::Distributed.info "lost connection with #{p}"
   p.reconnect
   sockets.delete socket
				elsif p.disconnecting?
   Roby::Distributed.info "#{p} disconnected"
   p.disconnected
				end
  end

			rescue Exception
  Roby::Distributed.fatal "error in ConnectionSpace#receive: #{$!.full_message}"
			end
 end
		end
end

#remove_owner(object, peer) ⇒ Object



21
22
23
# File 'lib/roby/distributed/peer.rb', line 21

def remove_owner(object, peer)
    object.remove_owner(peer, false)
end

#ring_discovery?Boolean

If we are doing discovery based on Rinda::RingFinger

Returns:

  • (Boolean)


123
# File 'lib/roby/distributed/connection_space.rb', line 123

def ring_discovery?; @ring_discovery end

#start_keeperObject

Disable the keeper thread, we will do cleanup ourselves



502
# File 'lib/roby/distributed/connection_space.rb', line 502

def start_keeper; end

#start_neighbour_discovery(block = false) ⇒ Object

Starts one neighbour discovery loop



435
436
437
438
439
440
441
442
443
444
445
# File 'lib/roby/distributed/connection_space.rb', line 435

def start_neighbour_discovery(block = false)
		synchronize do
 unless discovery_thread && discovery_thread.alive?
			raise "no discovery thread"
 end

 @discovery_start = Time.now
 start_discovery.signal
		end
		wait_discovery if block
end

#synchronizeObject



140
# File 'lib/roby/distributed/connection_space.rb', line 140

def synchronize; mutex.synchronize { yield } end

#transaction_abandon_commit(trsc, reason) ⇒ Object

This makes ConnectionSpace act as a PeerServer object locally



509
510
511
# File 'lib/roby/distributed/connection_space.rb', line 509

def transaction_abandon_commit(trsc, reason) # :nodoc:
		trsc.abandoned_commit(reason)
end

#transaction_commit(trsc) ⇒ Object

This makes ConnectionSpace act as a PeerServer object locally



513
514
515
# File 'lib/roby/distributed/connection_space.rb', line 513

def transaction_commit(trsc) # :nodoc:
		trsc.commit_transaction(false)
end

#transaction_discard(trsc) ⇒ Object

This makes ConnectionSpace act as a PeerServer object locally



517
518
519
# File 'lib/roby/distributed/connection_space.rb', line 517

def transaction_discard(trsc) # :nodoc:
		trsc.discard_transaction(false)
end

#transaction_prepare_commit(trsc) ⇒ Object

This makes ConnectionSpace act as a PeerServer object locally



505
506
507
# File 'lib/roby/distributed/connection_space.rb', line 505

def transaction_prepare_commit(trsc) # :nodoc:
		!trsc.valid_transaction?
end

#wait_discoveryObject



447
448
449
450
451
# File 'lib/roby/distributed/connection_space.rb', line 447

def wait_discovery
		discovering? do
 finished_discovery.wait(mutex)
		end
end

#wait_next_discoveryObject



452
453
454
455
456
457
458
459
# File 'lib/roby/distributed/connection_space.rb', line 452

def wait_next_discovery
		synchronize do
 unless discovery_thread && discovery_thread.alive?
			raise "no discovery thread"
 end
 finished_discovery.wait(mutex)
		end
end