Module: Roby::Distributed

Extended by:
Logger::Forward, Logger::Hierarchy
Included in:
Test
Defined in:
lib/roby/distributed.rb,
lib/roby/distributed/drb.rb,
lib/roby/distributed/base.rb,
lib/roby/distributed/peer.rb,
lib/roby/test/distributed.rb,
lib/roby/distributed/proxy.rb,
lib/roby/distributed/protocol.rb,
lib/roby/distributed/transaction.rb,
lib/roby/distributed/subscription.rb,
lib/roby/distributed/communication.rb,
lib/roby/distributed/notifications.rb,
lib/roby/distributed/connection_space.rb,
lib/roby/distributed/distributed_object.rb,
ext/droby/dump.cc

Overview

Communication protocol (and code namespace structure)

Getting remote objects

There is actually two ways to get a remote object

  • the object has been explicitely subscribed to by calling Peer#subscribe

  • the object has been sent to us because it is linked to an object we own or an object we are subscribed to

In the first case, the object must be referenced in the first place. It can have been sent to us as a query result (see Query), or because it has been involved in a distributed transaction. In the second case, it is either us which have added the relation, or the remote peer. If it is us, we should have subscribed to the object, added the relation, and then we may unsubscribe to the object.

We forget about a remote object when Plan#garbage_collect removes it.

Subscription management

The pDB gets updated about all objects it is subscribed to.

Defined Under Namespace

Modules: DistributedObject, EventNotifications, PlanModificationHooks, RelationModificationHooks, TaskArgumentsNotifications, TaskNotifications, Test Classes: CallSpec, CallbackProcessingError, ConnectionError, ConnectionFailed, ConnectionFailedError, ConnectionSpace, ConnectionTask, DRobyConstant, DRobyModel, DRobyTaskModel, DisconnectedError, InvalidRemoteOperation, InvalidRemoteTaskOperation, Neighbour, NotAliveError, NotEditor, NotReady, Peer, PeerServer, RecursiveCallbacksError, RemoteID, RingServer, Transaction

Constant Summary collapse

DISCOVERY_RING_PORT =
48901
DEFAULT_DROBY_PORT =
48902
DEBUG_MARSHALLING =

If set to true, enable some consistency-checking code in the communication code.

false

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.cycles_rxObject (readonly)

The queue of cycles read by ConnectionSpace#receive and not processed



337
338
339
# File 'lib/roby/distributed/base.rb', line 337

def cycles_rx
  @cycles_rx
end

.keepObject (readonly)

The set of objects we should temporarily keep because they are used in a callback mechanism (like a remote query or a trigger)



209
210
211
# File 'lib/roby/distributed/base.rb', line 209

def keep
  @keep
end

.new_neighbours_observersObject (readonly)

The set of proc objects which should be notified when new neighbours are detected.



572
573
574
# File 'lib/roby/distributed/connection_space.rb', line 572

def new_neighbours_observers
  @new_neighbours_observers
end

.pending_cyclesObject (readonly)

The set of cycles that have been read from #pending_cycles but have not been processed yet because the peers have disabled_rx? set

This variable must be accessed only in the control thread



342
343
344
# File 'lib/roby/distributed/base.rb', line 342

def pending_cycles
  @pending_cycles
end

.removed_objectsObject (readonly)

The set of objects that have been removed locally, but for which there are still references on our peers



328
329
330
# File 'lib/roby/distributed/base.rb', line 328

def removed_objects
  @removed_objects
end

.serverObject (readonly)

The RingServer object through which we publish this plan manager on the network



525
526
527
# File 'lib/roby/distributed/connection_space.rb', line 525

def server
  @server
end

.stateObject

The one and only ConnectionSpace object



190
191
192
# File 'lib/roby/distributed/base.rb', line 190

def state
  @state
end

.transaction_handlerObject

The block which is called when a new transaction has been proposed to us.



9
10
11
# File 'lib/roby/distributed/transaction.rb', line 9

def transaction_handler
  @transaction_handler
end

.updated_objectsObject (readonly)

The list of objects that are being updated because of remote update



273
274
275
# File 'lib/roby/distributed/base.rb', line 273

def updated_objects
  @updated_objects
end

Class Method Details

.call(*args) ⇒ Object

Execute the given message and wait for its result to be available.

This makes Roby::Distributed behave like a Peer object



81
82
83
84
85
# File 'lib/roby/distributed/connection_space.rb', line 81

def self.call(*args)
    Roby.execute do
  Distributed.state.send(*args)
    end
end

.call_peers(calling, m, *args) ⇒ Object

Calls args on all peers and returns a { peer => return_value } hash of all the values returned by each peer



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/roby/distributed/distributed_object.rb', line 91

def self.call_peers(calling, m, *args)
    Distributed.debug { "distributed call of #{m}(#{args}) on #{calling}" }

    # This is a tricky procedure. Let's describe what is done here:
    # * we send the required message to the peers listed in +calling+,
    #   and wait for all of them to have finished
    # * since there is a coordination requirement, once a peer have
    #   processed its call we stop processing any of the messages it 
    #   sends. We therefore block the RX thread of this peer using 
    #   the block_communication condition variable

    result              = Hash.new
    call_local          = calling.include?(Distributed)
    synchro, mutex      = Roby.condition_variable(true)

    mutex.synchronize do
  waiting_for = calling.size
  waiting_for -= 1 if call_local

  calling.each do |peer| 
      next if peer == Distributed

      callback = Proc.new do |peer_result|
    mutex.synchronize do
        result[peer] = peer.local_object(peer_result)
        waiting_for -= 1
        Distributed.debug { "reply for #{m}(#{args.join(", ")}) from #{peer}, #{waiting_for} remaining" }
        if waiting_for == 0
      synchro.broadcast
        end
        peer.disable_rx
    end
      end
      peer.queue_call false, m, args, callback, Thread.current
  end

  unless waiting_for == 0
      Distributed.debug "waiting for our peers to complete the call"
      synchro.wait(mutex) 
  end
    end

    if call_local
  Distributed.debug "processing locally ..."
  result[Distributed] = Distributed.call(m, *args)
    end
    result

ensure
    for peer in calling
  peer.enable_rx if peer != Distributed
    end
    Roby.return_condition_variable(synchro, mutex)
end

.clean_triggered(object) ⇒ Object

Remove objects from the sets of already-triggered objects. So, next time object will be tested for triggers, it will re-match the triggers it has already matched.



86
87
88
89
90
91
92
# File 'lib/roby/distributed/peer.rb', line 86

def clean_triggered(object)
    peers.each_value do |peer|
  peer.local_server.triggers.each_value do |_, triggered|
      triggered.delete object
  end
    end
end

.droby_dump(dest = nil) ⇒ Object

Returns a Peer::DRoby object which can be used in the dRoby connection to represent this plan manager.

This makes Roby::Distributed behave like a Peer object



61
62
63
64
65
# File 'lib/roby/distributed/connection_space.rb', line 61

def self.droby_dump(dest = nil)
    if state then state.droby_dump(dest)
    else @__single_marshalled_peer__ ||= Peer::DRoby.new('single', remote_id)
    end
end

.each_object_relation(object) ⇒ Object

Yields the relations of object which are to be distributed among peers.



313
314
315
316
317
# File 'lib/roby/distributed/base.rb', line 313

def each_object_relation(object)
    object.each_relation do |rel|
 yield(rel) if rel.distribute?
    end
end

.each_updated_peer(*objects) ⇒ Object

Yields the peers which are interested in at least one of the objects in objects.



7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# File 'lib/roby/distributed/subscription.rb', line 7

def each_updated_peer(*objects)
    for obj in objects
 return if !obj.distribute?
    end

    for _, peer in Distributed.peers
 next unless peer.connected?
 for obj in objects
      if obj.update_on?(peer)
  yield(peer)
  break
      end
 end
    end
end

.format(object, peer) ⇒ Object

Formats object so that it is ready to be dumped by Marshal.dump for sending to peer. This means that if the object has a droby_dump method, it is called to get a marshallable object which represents object. Moreover, if peer responds to #incremental_dump?(object), this is called to determine wether a full dump is required or if sending a Roby::Distributed::RemoteID for remote reference is enough.

If the object is not a DRbObject and does not define a #droby_dump method, it is proxied through a DRbObject if it present in Distributed.allow_remote_access. Otherwise, we will try to dump it as-is.



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'ext/droby/dump.cc', line 33

static VALUE droby_format(int argc, VALUE* argv, VALUE self)
{
    VALUE object, destination;
    rb_scan_args(argc, argv, "11", &object, &destination);

    if (RTEST(rb_obj_is_kind_of(object, cDRbObject)))
  return object;

    if (RTEST(rb_respond_to(object, id_droby_dump)))
    {
  if (!NIL_P(destination) && RTEST(rb_funcall(destination, rb_intern("incremental_dump?"), 1, object)))
      return rb_funcall(object, id_remote_id, 0);
  return rb_funcall(object, id_droby_dump, 1, destination);
    }

    VALUE remote_access = rb_iv_get(self, "@allowed_remote_access");
    int i;
    for (i = 0; i < RARRAY(remote_access)->len; ++i)
    {
  if (rb_obj_is_kind_of(object, RARRAY(remote_access)->ptr[i]))
      return rb_class_new_instance(1, &object, cDRbObject);
    }

    return object;
}

.ignore!Object

Called in PeerServer messages handlers to completely ignore the message which is currently being processed



464
465
466
# File 'lib/roby/distributed/communication.rb', line 464

def self.ignore!
    throw :ignore_this_call
end

.neighboursObject

The list of known neighbours. See ConnectionSpace#neighbours



553
554
555
556
557
# File 'lib/roby/distributed/connection_space.rb', line 553

def neighbours
    if state then state.neighbours
    else []
    end
end

.new_neighboursObject

The list of neighbours that have been found since the last execution cycle



561
562
563
564
565
# File 'lib/roby/distributed/connection_space.rb', line 561

def new_neighbours
    if state then state.new_neighbours
    else []
    end
end

.notify_new_neighboursObject

Called in the neighbour discovery thread to detect new neighbours. It fills the new_neighbours queue which is read by notify_new_neighbours to notify application code of new neighbours in the control thread



578
579
580
581
582
583
584
585
586
# File 'lib/roby/distributed/connection_space.rb', line 578

def notify_new_neighbours
    return unless Distributed.state
    while !new_neighbours.empty?
 cs, neighbour = new_neighbours.pop(true)
 new_neighbours_observers.each do |obs|
      obs[cs, neighbour]
 end
    end
end

.on_neighbourObject

Defines a block which should be called when a new neighbour is detected



590
591
592
593
594
# File 'lib/roby/distributed/connection_space.rb', line 590

def on_neighbour
    current = neighbours.dup
    Roby::Control.once { current.each { |n| yield(n) } }
    new_neighbours_observers << lambda { |_, n| yield(n) }
end

.on_transaction(&block) ⇒ Object

Sets up the transaction handler. The given block will be called in a separate thread whenever a remote peer proposes a new transaction



14
15
16
# File 'lib/roby/distributed/transaction.rb', line 14

def on_transaction(&block)
    Distributed.transaction_handler = block
end

.owns?(object) ⇒ Boolean

True if this plan manager owns object

Returns:



205
# File 'lib/roby/distributed/base.rb', line 205

def owns?(object); !state || state.owns?(object) end

.peer(id) ⇒ Object

Returns the Peer object for the given ID. id can be either the peer RemoteID or its name.



32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/roby/distributed/connection_space.rb', line 32

def self.peer(id)
    if id.kind_of?(Distributed::RemoteID)
  if id == remote_id
      Distributed
  else
      peers[id]
  end
    elsif id.respond_to?(:to_str)
  peers.each_value { |p| return p if p.remote_name == id.to_str }
  nil
    else
  nil
    end
end

.peersObject

The list of known peers. See ConnectionSpace#peers



320
321
322
323
324
# File 'lib/roby/distributed/base.rb', line 320

def peers
    if state then state.peers 
    else {}
    end
end

.process_cycle(peer, calls) ⇒ Object

Process once cycle worth of data from the given peer.



373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
# File 'lib/roby/distributed/base.rb', line 373

def self.process_cycle(peer, calls)
    from = Time.now
    calls_size = calls.size

    peer_server = peer.local_server
    peer_server.processing = true

    if !peer.connected?
  return
    end

    while call_spec = calls.shift
  return unless call_spec

  is_callback, method, args, critical, message_id = *call_spec
  Distributed.debug do 
      args_s = args.map { |obj| obj ? obj.to_s : 'nil' }
    "processing #{is_callback ? 'callback' : 'method'} [#{message_id}]#{method}(#{args_s.join(", ")})"
  end

  result = catch(:ignore_this_call) do
      peer_server.queued_completion = false
      peer_server.current_message_id = message_id
      peer_server.processing_callback = !!is_callback

      result = begin
       peer_server.send(method, *args)
         rescue Exception => e
       if critical
           peer.fatal_error e, method, args
       else
           peer_server.completed!(e, true)
       end
         end

      if !peer.connected?
    return
      end
      result
  end

  if method != :completed && method != :completion_group && !peer.disconnecting? && !peer.disconnected?
      if peer_server.queued_completion?
    Distributed.debug "done and already queued the completion message"
      else
    Distributed.debug { "done, returns #{result || 'nil'}" }
    peer.queue_call false, :completed, [result, false, message_id]
      end
  end

  if peer.disabled_rx?
      return calls
  end

    end

    Distributed.debug "successfully served #{calls_size} calls in #{Time.now - from} seconds"
    nil

rescue Exception => e
    Distributed.info "error in dRoby processing: #{e.full_message}"
    peer.disconnect

ensure
    peer_server.processing = false
end

.process_pendingObject

Extract data received so far from our peers and replays it if possible. Data can be ignored if RX is disabled with this peer (through Peer#disable_rx), or delayed if there is event propagation involved. In that last case, the events will be fired at the beginning of the next execution cycle and the remaining messages at the end of that same cycle.



351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
# File 'lib/roby/distributed/base.rb', line 351

def self.process_pending
    delayed_cycles = []
    while !(pending_cycles.empty? && cycles_rx.empty?)
  peer, calls = if pending_cycles.empty?
        cycles_rx.pop
          else pending_cycles.shift
          end

  if peer.disabled_rx?
      delayed_cycles.push [peer, calls]
  else
      if remaining = process_cycle(peer, calls)
    delayed_cycles.push [peer, remaining]
      end
  end
    end

ensure
    @pending_cycles = delayed_cycles
end

.publish(options = {}) ⇒ Object

Enable ring discovery on our part. A RingServer object is set up to listen to connections on the port given as a :port option (or DISCOVERY_RING_PORT if none is specified).

Note that all plan managers must use the same discovery port.



537
538
539
540
541
# File 'lib/roby/distributed/connection_space.rb', line 537

def publish(options = {})
    options[:port] ||= DISCOVERY_RING_PORT
    @server = RingServer.new(state, options) 
    Distributed.info "listening for distributed discovery on #{options[:port]}"
end

.published?Boolean

True if we are published on the network.

See #server, #publish and #unpublish

Returns:



530
# File 'lib/roby/distributed/connection_space.rb', line 530

def published?; !!@server end

.relations_of(object) ⇒ Object

call-seq:

relations_of(object) => relations
relations_of(object) { |object| ... } => relations

Relations to be sent to the remote host if object is in a plan. The returned array if formatted as

[ [graph, parents, children], [graph, ..] ]

where parents is the set of parents of objects in graph and children the set of children

parents and children are formatted as

object, info, object, info, …

If a block is given, a new parent or child is added only if the block returns true



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/roby/distributed/notifications.rb', line 38

def self.relations_of(object)
    result = []
    # For transaction proxies, never send non-discovered relations to
    # remote hosts
    Roby::Distributed.each_object_relation(object) do |graph|
  next unless graph.distribute?
  parents = []
  object.each_parent_object(graph) do |parent|
      next unless parent.distribute?
      next unless yield(parent) if block_given?
      parents << parent << parent[object, graph]
  end
  children = []
  object.each_child_object(graph) do |child|
      next unless child.distribute?
      next unless yield(child) if block_given?
      children << child << object[child, graph]
  end
  result << graph << parents << children
    end

    result
end

.remote_idObject

Returns a RemoteID object suitable to represent this plan manager on the network.

This makes Roby::Distributed behave like a Peer object



51
52
53
54
55
# File 'lib/roby/distributed/connection_space.rb', line 51

def self.remote_id
    if state then state.remote_id
    else @__single_remote_id__ ||= RemoteID.new('local', 0)
    end
end

.remotely_useful_objects(candidates, include_subscriptions_relations, result = nil) ⇒ Object

Compute the subset of candidates that are to be considered as useful because of our peers and returns it.

More specifically, an object will be included in the result if:

  • this plan manager is subscribed to it

  • the object is directly related to a self-owned object

  • if include_subscriptions_relations is true, object is directly related to a subscribed object.

The method takes into account plan children in its computation: for instance, a task will be included in the result if one of its events meet the requirements described above.

If result is non-nil, the method adds the objects to result using #<< and returns it.



226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
# File 'lib/roby/distributed/base.rb', line 226

def remotely_useful_objects(candidates, include_subscriptions_relations, result = nil)
    return ValueSet.new if candidates.empty?

    result  ||= Distributed.keep.referenced_objects.to_value_set

    child_set = ValueSet.new
    for obj in candidates
        if result.include?(obj.root_object)
      next
 elsif obj.subscribed?
      result << obj
      next
 end

 not_found = obj.each_relation do |rel|
      next unless rel.distribute? && rel.root_relation?

      not_found = obj.each_parent_object(rel) do |parent|
          parent = parent.root_object
          if parent.distribute? && 
        ((include_subscriptions_relations && parent.subscribed?) || parent.self_owned?)
        result << obj.root_object
        break
          end
      end
      break unless not_found

      not_found = obj.each_child_object(rel) do |child|
          child = child.root_object
          if child.distribute? && 
        ((include_subscriptions_relations && child.subscribed?) || child.self_owned?)
        result << obj.root_object
        break
          end
      end
      break unless not_found
        end

 if not_found && obj.respond_to?(:each_plan_child)
      obj.each_plan_child { |plan_child| child_set << plan_child }
 end
    end

    result.merge remotely_useful_objects(child_set, false, result)
end

.RemoteProxyModel(object_model) ⇒ Object

Builds a remote proxy model for object_model. object_model is either a string or a class. In the first case, it is interpreted as a constant name.



337
338
339
# File 'lib/roby/distributed/proxy.rb', line 337

def self.RemoteProxyModel(object_model)
    object_model
end

.subgraph_of(objects) ⇒ Object

Returns the set of edges for which both sides are in objects. The set if formatted as [object, relations, …] where relations is the output of relations_of



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# File 'lib/roby/distributed/notifications.rb', line 6

def self.subgraph_of(objects)
    return [] if objects.size < 2

    relations = []

    objects = objects.dup
    objects.delete_if do |obj|
  obj_relations = relations_of(obj) do |related_object| 
      objects.include?(related_object) 
  end
  relations << obj << obj_relations
  true
    end

    relations
end

.subscribed?(object) ⇒ Boolean

True if this plan manager is subscribed to object

This makes Roby::Distributed behave like a Peer object

Returns:



90
91
92
# File 'lib/roby/distributed/connection_space.rb', line 90

def self.subscribed?(object)
    object.subscribed?
end

.transmit(*args) ⇒ Object

Execute the given message without blocking. If a block is given, yield the result to that block.

This makes Roby::Distributed behave like a Peer object



71
72
73
74
75
76
# File 'lib/roby/distributed/connection_space.rb', line 71

def self.transmit(*args)
    Roby::Control.once do
  result = Distributed.state.send(*args)
  yield(result) if block_given?
    end
end

.trigger(*objects) ⇒ Object

This method will call PeerServer#trigger on all peers, for the objects in objects which are eligible for triggering.

The same task cannot match the same trigger twice. To allow that, call #clean_triggered.



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/roby/distributed/peer.rb', line 68

def trigger(*objects)
    return unless Roby::Distributed.state 
    objects.delete_if do |o| 
  o.plan != Roby::Distributed.state.plan ||
      !o.distribute? ||
      !o.self_owned?
    end
    return if objects.empty?

    # If +object+ is a trigger, send the :triggered event but do *not*
    # act as if +object+ was subscribed
    peers.each_value do |peer|
  peer.local_server.trigger(*objects)
    end
end

.unpublishObject

Disable the ring discovery on our part.



544
545
546
547
548
549
550
# File 'lib/roby/distributed/connection_space.rb', line 544

def unpublish
    if server 
 server.close
 @server = nil
 Distributed.info "disabled distributed discovery"
    end
end

.update(object) ⇒ Object

Call the block with the objects in objects added to the updated_objects set



297
298
299
300
301
302
303
304
305
306
307
308
309
# File 'lib/roby/distributed/base.rb', line 297

def update(object)
    if object.respond_to?(:__getobj__) && !object.kind_of?(Roby::Transactions::Proxy)
 object = object.__getobj__
    end

    included = unless updated_objects.include?(object)
     @updated_objects << object
 end

    yield
ensure
    @updated_objects.delete(object) if included
end

.update_all(objects) ⇒ Object

Call the block with the objects in objects added to the updated_objects set



287
288
289
290
291
292
293
# File 'lib/roby/distributed/base.rb', line 287

def update_all(objects)
    old_updated_objects = @updated_objects
    @updated_objects |= objects.to_value_set
    yield
ensure
    @updated_objects = old_updated_objects
end

.updating?(object) ⇒ Boolean

True if we are updating object

Returns:



276
277
278
# File 'lib/roby/distributed/base.rb', line 276

def updating?(object)
    updated_objects.include?(object) 
end

.updating_all?(objects) ⇒ Boolean

True if we are updating all objects in objects

Returns:



281
282
283
# File 'lib/roby/distributed/base.rb', line 281

def updating_all?(objects)
    updated_objects.include_all?(objects.to_value_set)
end