Module: Roby::Distributed
- Extended by:
- Logger::Forward, Logger::Hierarchy
- Included in:
- Test
- Defined in:
- lib/roby/distributed.rb,
lib/roby/distributed/drb.rb,
lib/roby/distributed/base.rb,
lib/roby/distributed/peer.rb,
lib/roby/test/distributed.rb,
lib/roby/distributed/proxy.rb,
lib/roby/distributed/protocol.rb,
lib/roby/distributed/transaction.rb,
lib/roby/distributed/subscription.rb,
lib/roby/distributed/communication.rb,
lib/roby/distributed/notifications.rb,
lib/roby/distributed/connection_space.rb,
lib/roby/distributed/distributed_object.rb,
ext/droby/dump.cc
Overview
Communication protocol (and code namespace structure)
Getting remote objects
There is actually two ways to get a remote object
-
the object has been explicitely subscribed to by calling Peer#subscribe
-
the object has been sent to us because it is linked to an object we own or an object we are subscribed to
In the first case, the object must be referenced in the first place. It can have been sent to us as a query result (see Query), or because it has been involved in a distributed transaction. In the second case, it is either us which have added the relation, or the remote peer. If it is us, we should have subscribed to the object, added the relation, and then we may unsubscribe to the object.
We forget about a remote object when Plan#garbage_collect removes it.
Subscription management
The pDB gets updated about all objects it is subscribed to.
Defined Under Namespace
Modules: DistributedObject, EventNotifications, PlanModificationHooks, RelationModificationHooks, TaskArgumentsNotifications, TaskNotifications, Test Classes: CallSpec, CallbackProcessingError, ConnectionError, ConnectionFailed, ConnectionFailedError, ConnectionSpace, ConnectionTask, DRobyConstant, DRobyModel, DRobyTaskModel, DisconnectedError, InvalidRemoteOperation, InvalidRemoteTaskOperation, Neighbour, NotAliveError, NotEditor, NotReady, Peer, PeerServer, RecursiveCallbacksError, RemoteID, RingServer, Transaction
Constant Summary collapse
- DISCOVERY_RING_PORT =
48901- DEFAULT_DROBY_PORT =
48902- DEBUG_MARSHALLING =
If set to true, enable some consistency-checking code in the communication code.
false
Class Attribute Summary collapse
-
.cycles_rx ⇒ Object
readonly
The queue of cycles read by ConnectionSpace#receive and not processed.
-
.keep ⇒ Object
readonly
The set of objects we should temporarily keep because they are used in a callback mechanism (like a remote query or a trigger).
-
.new_neighbours_observers ⇒ Object
readonly
The set of proc objects which should be notified when new neighbours are detected.
-
.pending_cycles ⇒ Object
readonly
The set of cycles that have been read from #pending_cycles but have not been processed yet because the peers have disabled_rx? set.
-
.removed_objects ⇒ Object
readonly
The set of objects that have been removed locally, but for which there are still references on our peers.
-
.server ⇒ Object
readonly
The RingServer object through which we publish this plan manager on the network.
-
.state ⇒ Object
The one and only ConnectionSpace object.
-
.transaction_handler ⇒ Object
The block which is called when a new transaction has been proposed to us.
-
.updated_objects ⇒ Object
readonly
The list of objects that are being updated because of remote update.
Class Method Summary collapse
-
.call(*args) ⇒ Object
Execute the given message and wait for its result to be available.
-
.call_peers(calling, m, *args) ⇒ Object
Calls
argson all peers and returns a { peer => return_value } hash of all the values returned by each peer. -
.clean_triggered(object) ⇒ Object
Remove
objectsfrom the sets of already-triggered objects. -
.droby_dump(dest = nil) ⇒ Object
Returns a Peer::DRoby object which can be used in the dRoby connection to represent this plan manager.
-
.each_object_relation(object) ⇒ Object
Yields the relations of
objectwhich are to be distributed among peers. -
.each_updated_peer(*objects) ⇒ Object
Yields the peers which are interested in at least one of the objects in
objects. -
.format(object, peer) ⇒ Object
Formats
objectso that it is ready to be dumped by Marshal.dump for sending topeer. -
.ignore! ⇒ Object
Called in PeerServer messages handlers to completely ignore the message which is currently being processed.
-
.neighbours ⇒ Object
The list of known neighbours.
-
.new_neighbours ⇒ Object
The list of neighbours that have been found since the last execution cycle.
-
.notify_new_neighbours ⇒ Object
Called in the neighbour discovery thread to detect new neighbours.
-
.on_neighbour ⇒ Object
Defines a block which should be called when a new neighbour is detected.
-
.on_transaction(&block) ⇒ Object
Sets up the transaction handler.
-
.owns?(object) ⇒ Boolean
True if this plan manager owns
object. -
.peer(id) ⇒ Object
Returns the Peer object for the given ID.
-
.peers ⇒ Object
The list of known peers.
-
.process_cycle(peer, calls) ⇒ Object
Process once cycle worth of data from the given peer.
-
.process_pending ⇒ Object
Extract data received so far from our peers and replays it if possible.
-
.publish(options = {}) ⇒ Object
Enable ring discovery on our part.
-
.published? ⇒ Boolean
True if we are published on the network.
-
.relations_of(object) ⇒ Object
call-seq: relations_of(object) => relations relations_of(object) { |object| … } => relations.
-
.remote_id ⇒ Object
Returns a RemoteID object suitable to represent this plan manager on the network.
-
.remotely_useful_objects(candidates, include_subscriptions_relations, result = nil) ⇒ Object
Compute the subset of
candidatesthat are to be considered as useful because of our peers and returns it. -
.RemoteProxyModel(object_model) ⇒ Object
Builds a remote proxy model for
object_model. -
.subgraph_of(objects) ⇒ Object
Returns the set of edges for which both sides are in
objects. -
.subscribed?(object) ⇒ Boolean
True if this plan manager is subscribed to
object. -
.transmit(*args) ⇒ Object
Execute the given message without blocking.
-
.trigger(*objects) ⇒ Object
This method will call PeerServer#trigger on all peers, for the objects in
objectswhich are eligible for triggering. -
.unpublish ⇒ Object
Disable the ring discovery on our part.
-
.update(object) ⇒ Object
Call the block with the objects in
objectsadded to the updated_objects set. -
.update_all(objects) ⇒ Object
Call the block with the objects in
objectsadded to the updated_objects set. -
.updating?(object) ⇒ Boolean
True if we are updating
object. -
.updating_all?(objects) ⇒ Boolean
True if we are updating all objects in
objects.
Class Attribute Details
.cycles_rx ⇒ Object (readonly)
The queue of cycles read by ConnectionSpace#receive and not processed
337 338 339 |
# File 'lib/roby/distributed/base.rb', line 337 def cycles_rx @cycles_rx end |
.keep ⇒ Object (readonly)
The set of objects we should temporarily keep because they are used in a callback mechanism (like a remote query or a trigger)
209 210 211 |
# File 'lib/roby/distributed/base.rb', line 209 def keep @keep end |
.new_neighbours_observers ⇒ Object (readonly)
The set of proc objects which should be notified when new neighbours are detected.
572 573 574 |
# File 'lib/roby/distributed/connection_space.rb', line 572 def new_neighbours_observers @new_neighbours_observers end |
.pending_cycles ⇒ Object (readonly)
The set of cycles that have been read from #pending_cycles but have not been processed yet because the peers have disabled_rx? set
This variable must be accessed only in the control thread
342 343 344 |
# File 'lib/roby/distributed/base.rb', line 342 def pending_cycles @pending_cycles end |
.removed_objects ⇒ Object (readonly)
The set of objects that have been removed locally, but for which there are still references on our peers
328 329 330 |
# File 'lib/roby/distributed/base.rb', line 328 def removed_objects @removed_objects end |
.server ⇒ Object (readonly)
The RingServer object through which we publish this plan manager on the network
525 526 527 |
# File 'lib/roby/distributed/connection_space.rb', line 525 def server @server end |
.state ⇒ Object
The one and only ConnectionSpace object
190 191 192 |
# File 'lib/roby/distributed/base.rb', line 190 def state @state end |
.transaction_handler ⇒ Object
The block which is called when a new transaction has been proposed to us.
9 10 11 |
# File 'lib/roby/distributed/transaction.rb', line 9 def transaction_handler @transaction_handler end |
.updated_objects ⇒ Object (readonly)
The list of objects that are being updated because of remote update
273 274 275 |
# File 'lib/roby/distributed/base.rb', line 273 def updated_objects @updated_objects end |
Class Method Details
.call(*args) ⇒ Object
Execute the given message and wait for its result to be available.
This makes Roby::Distributed behave like a Peer object
81 82 83 84 85 |
# File 'lib/roby/distributed/connection_space.rb', line 81 def self.call(*args) Roby.execute do Distributed.state.send(*args) end end |
.call_peers(calling, m, *args) ⇒ Object
Calls args on all peers and returns a { peer => return_value } hash of all the values returned by each peer
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/roby/distributed/distributed_object.rb', line 91 def self.call_peers(calling, m, *args) Distributed.debug { "distributed call of #{m}(#{args}) on #{calling}" } # This is a tricky procedure. Let's describe what is done here: # * we send the required message to the peers listed in +calling+, # and wait for all of them to have finished # * since there is a coordination requirement, once a peer have # processed its call we stop processing any of the messages it # sends. We therefore block the RX thread of this peer using # the block_communication condition variable result = Hash.new call_local = calling.include?(Distributed) synchro, mutex = Roby.condition_variable(true) mutex.synchronize do waiting_for = calling.size waiting_for -= 1 if call_local calling.each do |peer| next if peer == Distributed callback = Proc.new do |peer_result| mutex.synchronize do result[peer] = peer.local_object(peer_result) waiting_for -= 1 Distributed.debug { "reply for #{m}(#{args.join(", ")}) from #{peer}, #{waiting_for} remaining" } if waiting_for == 0 synchro.broadcast end peer.disable_rx end end peer.queue_call false, m, args, callback, Thread.current end unless waiting_for == 0 Distributed.debug "waiting for our peers to complete the call" synchro.wait(mutex) end end if call_local Distributed.debug "processing locally ..." result[Distributed] = Distributed.call(m, *args) end result ensure for peer in calling peer.enable_rx if peer != Distributed end Roby.return_condition_variable(synchro, mutex) end |
.clean_triggered(object) ⇒ Object
Remove objects from the sets of already-triggered objects. So, next time object will be tested for triggers, it will re-match the triggers it has already matched.
86 87 88 89 90 91 92 |
# File 'lib/roby/distributed/peer.rb', line 86 def clean_triggered(object) peers.each_value do |peer| peer.local_server.triggers.each_value do |_, triggered| triggered.delete object end end end |
.droby_dump(dest = nil) ⇒ Object
Returns a Peer::DRoby object which can be used in the dRoby connection to represent this plan manager.
This makes Roby::Distributed behave like a Peer object
61 62 63 64 65 |
# File 'lib/roby/distributed/connection_space.rb', line 61 def self.droby_dump(dest = nil) if state then state.droby_dump(dest) else @__single_marshalled_peer__ ||= Peer::DRoby.new('single', remote_id) end end |
.each_object_relation(object) ⇒ Object
Yields the relations of object which are to be distributed among peers.
313 314 315 316 317 |
# File 'lib/roby/distributed/base.rb', line 313 def each_object_relation(object) object.each_relation do |rel| yield(rel) if rel.distribute? end end |
.each_updated_peer(*objects) ⇒ Object
Yields the peers which are interested in at least one of the objects in objects.
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
# File 'lib/roby/distributed/subscription.rb', line 7 def each_updated_peer(*objects) for obj in objects return if !obj.distribute? end for _, peer in Distributed.peers next unless peer.connected? for obj in objects if obj.update_on?(peer) yield(peer) break end end end end |
.format(object, peer) ⇒ Object
Formats object so that it is ready to be dumped by Marshal.dump for sending to peer. This means that if the object has a droby_dump method, it is called to get a marshallable object which represents object. Moreover, if peer responds to #incremental_dump?(object), this is called to determine wether a full dump is required or if sending a Roby::Distributed::RemoteID for remote reference is enough.
If the object is not a DRbObject and does not define a #droby_dump method, it is proxied through a DRbObject if it present in Distributed.allow_remote_access. Otherwise, we will try to dump it as-is.
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'ext/droby/dump.cc', line 33 static VALUE droby_format(int argc, VALUE* argv, VALUE self) { VALUE object, destination; rb_scan_args(argc, argv, "11", &object, &destination); if (RTEST(rb_obj_is_kind_of(object, cDRbObject))) return object; if (RTEST(rb_respond_to(object, id_droby_dump))) { if (!NIL_P(destination) && RTEST(rb_funcall(destination, rb_intern("incremental_dump?"), 1, object))) return rb_funcall(object, id_remote_id, 0); return rb_funcall(object, id_droby_dump, 1, destination); } VALUE remote_access = rb_iv_get(self, "@allowed_remote_access"); int i; for (i = 0; i < RARRAY(remote_access)->len; ++i) { if (rb_obj_is_kind_of(object, RARRAY(remote_access)->ptr[i])) return rb_class_new_instance(1, &object, cDRbObject); } return object; } |
.ignore! ⇒ Object
Called in PeerServer messages handlers to completely ignore the message which is currently being processed
464 465 466 |
# File 'lib/roby/distributed/communication.rb', line 464 def self.ignore! throw :ignore_this_call end |
.neighbours ⇒ Object
The list of known neighbours. See ConnectionSpace#neighbours
553 554 555 556 557 |
# File 'lib/roby/distributed/connection_space.rb', line 553 def neighbours if state then state.neighbours else [] end end |
.new_neighbours ⇒ Object
The list of neighbours that have been found since the last execution cycle
561 562 563 564 565 |
# File 'lib/roby/distributed/connection_space.rb', line 561 def new_neighbours if state then state.new_neighbours else [] end end |
.notify_new_neighbours ⇒ Object
Called in the neighbour discovery thread to detect new neighbours. It fills the new_neighbours queue which is read by notify_new_neighbours to notify application code of new neighbours in the control thread
578 579 580 581 582 583 584 585 586 |
# File 'lib/roby/distributed/connection_space.rb', line 578 def notify_new_neighbours return unless Distributed.state while !new_neighbours.empty? cs, neighbour = new_neighbours.pop(true) new_neighbours_observers.each do |obs| obs[cs, neighbour] end end end |
.on_neighbour ⇒ Object
Defines a block which should be called when a new neighbour is detected
590 591 592 593 594 |
# File 'lib/roby/distributed/connection_space.rb', line 590 def on_neighbour current = neighbours.dup Roby::Control.once { current.each { |n| yield(n) } } new_neighbours_observers << lambda { |_, n| yield(n) } end |
.on_transaction(&block) ⇒ Object
Sets up the transaction handler. The given block will be called in a separate thread whenever a remote peer proposes a new transaction
14 15 16 |
# File 'lib/roby/distributed/transaction.rb', line 14 def on_transaction(&block) Distributed.transaction_handler = block end |
.owns?(object) ⇒ Boolean
True if this plan manager owns object
205 |
# File 'lib/roby/distributed/base.rb', line 205 def owns?(object); !state || state.owns?(object) end |
.peer(id) ⇒ Object
Returns the Peer object for the given ID. id can be either the peer RemoteID or its name.
32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/roby/distributed/connection_space.rb', line 32 def self.peer(id) if id.kind_of?(Distributed::RemoteID) if id == remote_id Distributed else peers[id] end elsif id.respond_to?(:to_str) peers.each_value { |p| return p if p.remote_name == id.to_str } nil else nil end end |
.peers ⇒ Object
The list of known peers. See ConnectionSpace#peers
320 321 322 323 324 |
# File 'lib/roby/distributed/base.rb', line 320 def peers if state then state.peers else {} end end |
.process_cycle(peer, calls) ⇒ Object
Process once cycle worth of data from the given peer.
373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 |
# File 'lib/roby/distributed/base.rb', line 373 def self.process_cycle(peer, calls) from = Time.now calls_size = calls.size peer_server = peer.local_server peer_server.processing = true if !peer.connected? return end while call_spec = calls.shift return unless call_spec is_callback, method, args, critical, = *call_spec Distributed.debug do args_s = args.map { |obj| obj ? obj.to_s : 'nil' } "processing #{is_callback ? 'callback' : 'method'} [#{message_id}]#{method}(#{args_s.join(", ")})" end result = catch(:ignore_this_call) do peer_server.queued_completion = false peer_server. = peer_server.processing_callback = !!is_callback result = begin peer_server.send(method, *args) rescue Exception => e if critical peer.fatal_error e, method, args else peer_server.completed!(e, true) end end if !peer.connected? return end result end if method != :completed && method != :completion_group && !peer.disconnecting? && !peer.disconnected? if peer_server.queued_completion? Distributed.debug "done and already queued the completion message" else Distributed.debug { "done, returns #{result || 'nil'}" } peer.queue_call false, :completed, [result, false, ] end end if peer.disabled_rx? return calls end end Distributed.debug "successfully served #{calls_size} calls in #{Time.now - from} seconds" nil rescue Exception => e Distributed.info "error in dRoby processing: #{e.full_message}" peer.disconnect ensure peer_server.processing = false end |
.process_pending ⇒ Object
Extract data received so far from our peers and replays it if possible. Data can be ignored if RX is disabled with this peer (through Peer#disable_rx), or delayed if there is event propagation involved. In that last case, the events will be fired at the beginning of the next execution cycle and the remaining messages at the end of that same cycle.
351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 |
# File 'lib/roby/distributed/base.rb', line 351 def self.process_pending delayed_cycles = [] while !(pending_cycles.empty? && cycles_rx.empty?) peer, calls = if pending_cycles.empty? cycles_rx.pop else pending_cycles.shift end if peer.disabled_rx? delayed_cycles.push [peer, calls] else if remaining = process_cycle(peer, calls) delayed_cycles.push [peer, remaining] end end end ensure @pending_cycles = delayed_cycles end |
.publish(options = {}) ⇒ Object
Enable ring discovery on our part. A RingServer object is set up to listen to connections on the port given as a :port option (or DISCOVERY_RING_PORT if none is specified).
Note that all plan managers must use the same discovery port.
537 538 539 540 541 |
# File 'lib/roby/distributed/connection_space.rb', line 537 def publish( = {}) [:port] ||= DISCOVERY_RING_PORT @server = RingServer.new(state, ) Distributed.info "listening for distributed discovery on #{options[:port]}" end |
.published? ⇒ Boolean
True if we are published on the network.
See #server, #publish and #unpublish
530 |
# File 'lib/roby/distributed/connection_space.rb', line 530 def published?; !!@server end |
.relations_of(object) ⇒ Object
call-seq:
relations_of(object) => relations
relations_of(object) { |object| ... } => relations
Relations to be sent to the remote host if object is in a plan. The returned array if formatted as
[ [graph, parents, children], [graph, ..] ]
where parents is the set of parents of objects in graph and children the set of children
parents and children are formatted as
- object, info, object, info, …
-
If a block is given, a new parent or child is added only if the block returns true
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/roby/distributed/notifications.rb', line 38 def self.relations_of(object) result = [] # For transaction proxies, never send non-discovered relations to # remote hosts Roby::Distributed.each_object_relation(object) do |graph| next unless graph.distribute? parents = [] object.each_parent_object(graph) do |parent| next unless parent.distribute? next unless yield(parent) if block_given? parents << parent << parent[object, graph] end children = [] object.each_child_object(graph) do |child| next unless child.distribute? next unless yield(child) if block_given? children << child << object[child, graph] end result << graph << parents << children end result end |
.remote_id ⇒ Object
Returns a RemoteID object suitable to represent this plan manager on the network.
This makes Roby::Distributed behave like a Peer object
51 52 53 54 55 |
# File 'lib/roby/distributed/connection_space.rb', line 51 def self.remote_id if state then state.remote_id else @__single_remote_id__ ||= RemoteID.new('local', 0) end end |
.remotely_useful_objects(candidates, include_subscriptions_relations, result = nil) ⇒ Object
Compute the subset of candidates that are to be considered as useful because of our peers and returns it.
More specifically, an object will be included in the result if:
-
this plan manager is subscribed to it
-
the object is directly related to a self-owned object
-
if
include_subscriptions_relationsis true,objectis directly related to a subscribed object.
The method takes into account plan children in its computation: for instance, a task will be included in the result if one of its events meet the requirements described above.
If result is non-nil, the method adds the objects to result using #<< and returns it.
226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 |
# File 'lib/roby/distributed/base.rb', line 226 def remotely_useful_objects(candidates, include_subscriptions_relations, result = nil) return ValueSet.new if candidates.empty? result ||= Distributed.keep.referenced_objects.to_value_set child_set = ValueSet.new for obj in candidates if result.include?(obj.root_object) next elsif obj.subscribed? result << obj next end not_found = obj.each_relation do |rel| next unless rel.distribute? && rel.root_relation? not_found = obj.each_parent_object(rel) do |parent| parent = parent.root_object if parent.distribute? && ((include_subscriptions_relations && parent.subscribed?) || parent.self_owned?) result << obj.root_object break end end break unless not_found not_found = obj.each_child_object(rel) do |child| child = child.root_object if child.distribute? && ((include_subscriptions_relations && child.subscribed?) || child.self_owned?) result << obj.root_object break end end break unless not_found end if not_found && obj.respond_to?(:each_plan_child) obj.each_plan_child { |plan_child| child_set << plan_child } end end result.merge remotely_useful_objects(child_set, false, result) end |
.RemoteProxyModel(object_model) ⇒ Object
Builds a remote proxy model for object_model. object_model is either a string or a class. In the first case, it is interpreted as a constant name.
337 338 339 |
# File 'lib/roby/distributed/proxy.rb', line 337 def self.RemoteProxyModel(object_model) object_model end |
.subgraph_of(objects) ⇒ Object
Returns the set of edges for which both sides are in objects. The set if formatted as [object, relations, …] where relations is the output of relations_of
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
# File 'lib/roby/distributed/notifications.rb', line 6 def self.subgraph_of(objects) return [] if objects.size < 2 relations = [] objects = objects.dup objects.delete_if do |obj| obj_relations = relations_of(obj) do || objects.include?() end relations << obj << obj_relations true end relations end |
.subscribed?(object) ⇒ Boolean
True if this plan manager is subscribed to object
This makes Roby::Distributed behave like a Peer object
90 91 92 |
# File 'lib/roby/distributed/connection_space.rb', line 90 def self.subscribed?(object) object.subscribed? end |
.transmit(*args) ⇒ Object
Execute the given message without blocking. If a block is given, yield the result to that block.
This makes Roby::Distributed behave like a Peer object
71 72 73 74 75 76 |
# File 'lib/roby/distributed/connection_space.rb', line 71 def self.transmit(*args) Roby::Control.once do result = Distributed.state.send(*args) yield(result) if block_given? end end |
.trigger(*objects) ⇒ Object
This method will call PeerServer#trigger on all peers, for the objects in objects which are eligible for triggering.
The same task cannot match the same trigger twice. To allow that, call #clean_triggered.
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/roby/distributed/peer.rb', line 68 def trigger(*objects) return unless Roby::Distributed.state objects.delete_if do |o| o.plan != Roby::Distributed.state.plan || !o.distribute? || !o.self_owned? end return if objects.empty? # If +object+ is a trigger, send the :triggered event but do *not* # act as if +object+ was subscribed peers.each_value do |peer| peer.local_server.trigger(*objects) end end |
.unpublish ⇒ Object
Disable the ring discovery on our part.
544 545 546 547 548 549 550 |
# File 'lib/roby/distributed/connection_space.rb', line 544 def unpublish if server server.close @server = nil Distributed.info "disabled distributed discovery" end end |
.update(object) ⇒ Object
Call the block with the objects in objects added to the updated_objects set
297 298 299 300 301 302 303 304 305 306 307 308 309 |
# File 'lib/roby/distributed/base.rb', line 297 def update(object) if object.respond_to?(:__getobj__) && !object.kind_of?(Roby::Transactions::Proxy) object = object.__getobj__ end included = unless updated_objects.include?(object) @updated_objects << object end yield ensure @updated_objects.delete(object) if included end |
.update_all(objects) ⇒ Object
Call the block with the objects in objects added to the updated_objects set
287 288 289 290 291 292 293 |
# File 'lib/roby/distributed/base.rb', line 287 def update_all(objects) old_updated_objects = @updated_objects @updated_objects |= objects.to_value_set yield ensure @updated_objects = old_updated_objects end |
.updating?(object) ⇒ Boolean
True if we are updating object
276 277 278 |
# File 'lib/roby/distributed/base.rb', line 276 def updating?(object) updated_objects.include?(object) end |
.updating_all?(objects) ⇒ Boolean
True if we are updating all objects in objects
281 282 283 |
# File 'lib/roby/distributed/base.rb', line 281 def updating_all?(objects) updated_objects.include_all?(objects.to_value_set) end |