Module: Roby::Distributed::Test
- Includes:
- Roby::Distributed, Test
- Defined in:
- lib/roby/test/distributed.rb
Defined Under Namespace
Modules: RemotePeerSupport
Constant Summary
Constants included from Roby::Distributed
DEBUG_MARSHALLING, DEFAULT_DROBY_PORT, DISCOVERY_RING_PORT
Constants included from Test
Test::ASSERT_ANY_EVENTS_TLS, Test::BASE_PORT, Test::DISCOVERY_SERVER, Test::LOCAL_PORT, Test::LOCAL_SERVER, Test::REMOTE_PORT, Test::REMOTE_SERVER, Test::Unit
Constants included from Roby
ROBY_LIB_DIR, ROBY_ROOT_DIR, RX_IN_FRAMEWORK, VERSION
Instance Attribute Summary collapse
-
#central_tuplespace ⇒ Object
readonly
Returns the value of attribute central_tuplespace.
-
#local ⇒ Object
readonly
Returns the value of attribute local.
-
#remote ⇒ Object
readonly
Returns the value of attribute remote.
-
#remote_peer ⇒ Object
readonly
Returns the value of attribute remote_peer.
-
#remote_plan ⇒ Object
readonly
Returns the value of attribute remote_plan.
Attributes included from Test
#console_logger, #original_collections, #remote_processes, #timings
Instance Method Summary collapse
-
#peer2peer(detached_control = false, &remote_init) ⇒ Object
Establishes a peer to peer connection between two ConnectionSpace objects.
- #process_events ⇒ Object
- #remote_server(&block) ⇒ Object
- #remote_task(match) ⇒ Object
- #setup ⇒ Object
- #setup_connection ⇒ Object
-
#start_peers(detached_control = false) ⇒ Object
Start a central discovery service, a remote connectionspace and a local connection space.
- #subscribe_task(match) ⇒ Object
- #teardown ⇒ Object
Methods included from Roby::Distributed
RemoteProxyModel, call, call_peers, clean_triggered, droby_dump, each_object_relation, each_updated_peer, format, ignore!, neighbours, new_neighbours, notify_new_neighbours, on_neighbour, on_transaction, owns?, peer, peers, process_cycle, process_pending, publish, published?, relations_of, remote_id, remotely_useful_objects, subgraph_of, subscribed?, transmit, trigger, unpublish, update, update_all, updating?, updating_all?
Methods included from Test
assert_any_event_result, #assert_doesnt_timeout, #assert_marshallable, #assert_original_error, check_event_assertions, #display_event_structure, #display_timings!, finalize_event_assertions, interrupt_waiting_threads, #new_plan, #plan, #prepare_plan, #remote_process, #restore_collections, sampling, #save_collection, stats, #stop_remote_processes, #teardown_plan, #wait_thread_stopped
Methods included from Roby
RelationSpace, app, check_failed_missions, condition_variable, control_thread, each_cycle, each_exception_handler, every, execute, filter_backtrace, format_exception, inside_control?, load_all_relations, log_exception, on_exception, once, outside_control?, poll_state_events, pretty_print_backtrace, return_condition_variable, wait_one_cycle, wait_until
Methods included from ExceptionHandlingObject
#handle_exception, #pass_exception
Instance Attribute Details
#central_tuplespace ⇒ Object (readonly)
Returns the value of attribute central_tuplespace.
153 154 155 |
# File 'lib/roby/test/distributed.rb', line 153 def central_tuplespace @central_tuplespace end |
#local ⇒ Object (readonly)
Returns the value of attribute local.
153 154 155 |
# File 'lib/roby/test/distributed.rb', line 153 def local @local end |
#remote ⇒ Object (readonly)
Returns the value of attribute remote.
153 154 155 |
# File 'lib/roby/test/distributed.rb', line 153 def remote @remote end |
#remote_peer ⇒ Object (readonly)
Returns the value of attribute remote_peer.
153 154 155 |
# File 'lib/roby/test/distributed.rb', line 153 def remote_peer @remote_peer end |
#remote_plan ⇒ Object (readonly)
Returns the value of attribute remote_plan.
153 154 155 |
# File 'lib/roby/test/distributed.rb', line 153 def remote_plan @remote_plan end |
Instance Method Details
#peer2peer(detached_control = false, &remote_init) ⇒ Object
Establishes a peer to peer connection between two ConnectionSpace objects
156 157 158 159 160 161 |
# File 'lib/roby/test/distributed.rb', line 156 def peer2peer(detached_control = false, &remote_init) timings[:starting_peers] = Time.now start_peers(detached_control, &remote_init) setup_connection timings[:started_peers] = Time.now end |
#process_events ⇒ Object
163 164 165 166 167 168 169 170 171 172 173 174 175 |
# File 'lib/roby/test/distributed.rb', line 163 def process_events if Roby.control.running? remote.wait_one_cycle Roby.control.wait_one_cycle elsif remote_peer && !remote_peer.disconnected? Roby::Control.synchronize do remote.process_events Roby.control.process_events end else super end end |
#remote_server(&block) ⇒ Object
200 201 202 203 204 205 206 207 208 209 210 211 |
# File 'lib/roby/test/distributed.rb', line 200 def remote_server(&block) DRb.stop_service remote_process do server = Class.new do class_eval(&block) end.new DRb.start_service REMOTE_SERVER, server end DRb.start_service LOCAL_SERVER DRbObject.new_with_uri(REMOTE_SERVER) end |
#remote_task(match) ⇒ Object
177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 |
# File 'lib/roby/test/distributed.rb', line 177 def remote_task(match) set_permanent = match.delete(:permanent) found = nil remote_peer.find_tasks.with_arguments(match).each do |task| assert(!found) if set_permanent plan.permanent(task) end found = if block_given? then yield(task) else task end end found end |
#setup ⇒ Object
10 11 12 13 14 15 16 17 18 19 20 21 22 |
# File 'lib/roby/test/distributed.rb', line 10 def setup super save_collection Distributed.new_neighbours_observers @old_distributed_logger_level = Distributed.logger.level timings[:setup] = Time.now # Start the GC so that it does not kick in a test. On slow machines, # it can trigger timeouts GC.start timings[:gc] = Time.now end |
#setup_connection ⇒ Object
143 144 145 146 147 148 149 150 151 |
# File 'lib/roby/test/distributed.rb', line 143 def setup_connection assert(remote_neighbour = local.neighbours.find { true }) Peer.initiate_connection(local, remote_neighbour) do |@remote_peer| end while !remote_peer process_events end assert(remote.send_local_peer(:connected?)) end |
#start_peers(detached_control = false) ⇒ Object
Start a central discovery service, a remote connectionspace and a local connection space. It yields the remote connection space *in the forked child* if a block is given.
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/roby/test/distributed.rb', line 94 def start_peers(detached_control = false) DRb.stop_service remote_process do DRb.start_service DISCOVERY_SERVER, Rinda::TupleSpace.new end if detached_control && Roby.control.running? begin Roby.control.quit Roby.control.join rescue ControlQuitError end end remote_process do central_tuplespace = DRbObject.new_with_uri(DISCOVERY_SERVER) cs = ConnectionSpace.new :ring_discovery => false, :discovery_tuplespace => central_tuplespace, :name => "remote" do |remote| getter = Class.new { def get; DRbObject.new(Distributed.state) end }.new DRb.start_service REMOTE_SERVER, getter end cs.extend RemotePeerSupport cs.testcase = self def cs.start_control_thread Control.event_processing << Distributed.state.method(:start_neighbour_discovery) Roby.control.run :detach => true end Distributed.state = cs yield(cs) if block_given? end DRb.start_service LOCAL_SERVER @central_tuplespace = DRbObject.new_with_uri(DISCOVERY_SERVER) @remote = DRbObject.new_with_uri(REMOTE_SERVER).get @local = ConnectionSpace.new :ring_discovery => false, :discovery_tuplespace => central_tuplespace, :name => 'local', :plan => plan Distributed.state = local if detached_control remote.start_control_thread Control.event_processing << Distributed.state.method(:start_neighbour_discovery) Roby.control.run :detach => true end end |
#subscribe_task(match) ⇒ Object
193 194 195 196 197 198 |
# File 'lib/roby/test/distributed.rb', line 193 def subscribe_task(match) remote_task(match) do |task| remote_peer.subscribe(task) task end end |
#teardown ⇒ Object
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/roby/test/distributed.rb', line 24 def teardown begin if remote && remote.respond_to?(:cleanup) remote.cleanup end rescue DRb::DRbConnError end super unless Distributed.peers.empty? Roby.warn " still referencing #{Distributed.peers.keys}" Distributed.peers.clear end # This one is a nasty one ... # The main plan is the only thing which remains. If we do not reset # the cached drb_object, it will be kept in the next test and the forked # child will therefore use it ... And it will fail plan.instance_eval do @__droby_remote_id__ = nil @__droby_marshalled__ = nil end if Distributed.state Distributed.state.quit end timings[:end] = Time.now rescue Exception STDERR.puts "failing teardown: #{$!.}" raise ensure Distributed.logger.level = @old_distributed_logger_level end |