Method: Roby::Distributed.call_peers
- Defined in:
- lib/roby/distributed/distributed_object.rb
.call_peers(calling, m, *args) ⇒ Object
Calls args on all peers and returns a { peer => return_value } hash of all the values returned by each peer
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/roby/distributed/distributed_object.rb', line 91 def self.call_peers(calling, m, *args) Distributed.debug { "distributed call of #{m}(#{args}) on #{calling}" } # This is a tricky procedure. Let's describe what is done here: # * we send the required message to the peers listed in +calling+, # and wait for all of them to have finished # * since there is a coordination requirement, once a peer have # processed its call we stop processing any of the messages it # sends. We therefore block the RX thread of this peer using # the block_communication condition variable result = Hash.new call_local = calling.include?(Distributed) synchro, mutex = Roby.condition_variable(true) mutex.synchronize do waiting_for = calling.size waiting_for -= 1 if call_local calling.each do |peer| next if peer == Distributed callback = Proc.new do |peer_result| mutex.synchronize do result[peer] = peer.local_object(peer_result) waiting_for -= 1 Distributed.debug { "reply for #{m}(#{args.join(", ")}) from #{peer}, #{waiting_for} remaining" } if waiting_for == 0 synchro.broadcast end peer.disable_rx end end peer.queue_call false, m, args, callback, Thread.current end unless waiting_for == 0 Distributed.debug "waiting for our peers to complete the call" synchro.wait(mutex) end end if call_local Distributed.debug "processing locally ..." result[Distributed] = Distributed.call(m, *args) end result ensure for peer in calling peer.enable_rx if peer != Distributed end Roby.return_condition_variable(synchro, mutex) end |