Method: Roby::Distributed.call_peers

Defined in:
lib/roby/distributed/distributed_object.rb

.call_peers(calling, m, *args) ⇒ Object

Calls args on all peers and returns a { peer => return_value } hash of all the values returned by each peer



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/roby/distributed/distributed_object.rb', line 91

def self.call_peers(calling, m, *args)
    Distributed.debug { "distributed call of #{m}(#{args}) on #{calling}" }

    # This is a tricky procedure. Let's describe what is done here:
    # * we send the required message to the peers listed in +calling+,
    #   and wait for all of them to have finished
    # * since there is a coordination requirement, once a peer have
    #   processed its call we stop processing any of the messages it 
    #   sends. We therefore block the RX thread of this peer using 
    #   the block_communication condition variable

    result              = Hash.new
    call_local          = calling.include?(Distributed)
    synchro, mutex      = Roby.condition_variable(true)

    mutex.synchronize do
  waiting_for = calling.size
  waiting_for -= 1 if call_local

  calling.each do |peer| 
      next if peer == Distributed

      callback = Proc.new do |peer_result|
    mutex.synchronize do
        result[peer] = peer.local_object(peer_result)
        waiting_for -= 1
        Distributed.debug { "reply for #{m}(#{args.join(", ")}) from #{peer}, #{waiting_for} remaining" }
        if waiting_for == 0
      synchro.broadcast
        end
        peer.disable_rx
    end
      end
      peer.queue_call false, m, args, callback, Thread.current
  end

  unless waiting_for == 0
      Distributed.debug "waiting for our peers to complete the call"
      synchro.wait(mutex) 
  end
    end

    if call_local
  Distributed.debug "processing locally ..."
  result[Distributed] = Distributed.call(m, *args)
    end
    result

ensure
    for peer in calling
  peer.enable_rx if peer != Distributed
    end
    Roby.return_condition_variable(synchro, mutex)
end